diff options
Diffstat (limited to 'src/mongo/s/catalog')
17 files changed, 462 insertions, 63 deletions
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index 2f3ee76fcbf..366e6c63644 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -274,13 +274,20 @@ public: BSONObjBuilder* result) = 0; /** - * Runs a read-only command on a single config server. + * Runs a read-only command on a config server. */ virtual bool runReadCommand(const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) = 0; /** + * Runs a user management related read-only command on a config server. + */ + virtual bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) = 0; + + /** * Applies oplog entries to the config servers. * Used by mergeChunk, splitChunk, and moveChunk commands. * diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp index 7e4275409f1..ab2cf24bfc9 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/catalog_manager_mock.cpp @@ -133,12 +133,18 @@ bool CatalogManagerMock::runUserManagementWriteCommand(const string& commandName return true; } -bool CatalogManagerMock::runReadCommand(const string& dbname, +bool CatalogManagerMock::runReadCommand(const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { return true; } +bool CatalogManagerMock::runUserManagementReadCommand(const string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + return true; +} + Status CatalogManagerMock::applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) { return Status::OK(); diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h index 90e25993fcb..83bcab351e2 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.h +++ b/src/mongo/s/catalog/catalog_manager_mock.h @@ -94,9 +94,13 @@ public: const BSONObj& cmdObj, BSONObjBuilder* result) override; - bool runReadCommand(const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; + virtual bool runReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; + + bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; Status applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) override; diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 4996af0c543..24b710ec14b 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -879,7 +879,13 @@ bool CatalogManagerLegacy::runUserManagementWriteCommand(const string& commandNa return Command::appendCommandStatus(*result, status); } -bool CatalogManagerLegacy::runReadCommand(const string& dbname, +bool CatalogManagerLegacy::runUserManagementReadCommand(const string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + return runReadCommand(dbname, cmdObj, result); +} + +bool CatalogManagerLegacy::runReadCommand(const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { try { diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index 7e202d8aa2c..068b8555d91 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -101,6 +101,10 @@ public: const BSONObj& cmdObj, BSONObjBuilder* result) override; + bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; + Status applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) override; diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript index 2e7d7d72f4a..2cae356706a 100644 --- a/src/mongo/s/catalog/replset/SConscript +++ b/src/mongo/s/catalog/replset/SConscript @@ -35,6 +35,7 @@ env.Library( 'catalog_manager_replica_set.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/repl/read_concern_args', '$BUILD_DIR/mongo/s/catalog/catalog_manager', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager', '$BUILD_DIR/mongo/s/client/sharding_client', @@ -58,6 +59,7 @@ env.CppUnitTest( 'catalog_manager_replica_set', '$BUILD_DIR/mongo/client/remote_command_targeter_mock', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/rpc/metadata', '$BUILD_DIR/mongo/executor/network_test_env', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 6296253463a..1590821c415 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -45,8 +45,10 @@ #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/network_interface.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/type_actionlog.h" @@ -86,8 +88,11 @@ namespace { // Until read committed is supported always write to the primary with majority write and read // from the secondary. That way we ensure that reads will see a consistent data. -const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryPreferred, TagSet{}); - +// TODO: switch back to SecondaryPreferred once SERVER-19675 is fixed +const ReadPreferenceSetting kConfigReadSelector(ReadPreference::PrimaryOnly, TagSet{}); +const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred, + TagSet{}); +const BSONObj kReplMetadata(BSON(rpc::kReplicationMetadataFieldName << 1)); const int kInitialSSVRetries = 3; const int kActionLogCollectionSize = 1024 * 1024 * 2; const int kChangeLogCollectionSize = 1024 * 1024 * 10; @@ -169,7 +174,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, return readHost.getStatus(); } - auto countStatus = _runCountCommand( + auto countStatus = _runCountCommandOnConfig( readHost.getValue(), NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::ns(ns))); if (!countStatus.isOK()) { return countStatus.getStatus(); @@ -247,10 +252,10 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC } // Check preconditions for removing the shard - auto countStatus = - _runCountCommand(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSON(ShardType::name() << NE << name << ShardType::draining(true))); + auto countStatus = _runCountCommandOnConfig( + readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSON(ShardType::name() << NE << name << ShardType::draining(true))); if (!countStatus.isOK()) { return countStatus.getStatus(); } @@ -259,9 +264,9 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC "Can't have more than one draining shard at a time"); } - countStatus = _runCountCommand(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSON(ShardType::name() << NE << name)); + countStatus = _runCountCommandOnConfig(readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSON(ShardType::name() << NE << name)); if (!countStatus.isOK()) { return countStatus.getStatus(); } @@ -270,9 +275,10 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC } // Figure out if shard is already draining - countStatus = _runCountCommand(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSON(ShardType::name() << name << ShardType::draining(true))); + countStatus = + _runCountCommandOnConfig(readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSON(ShardType::name() << name << ShardType::draining(true))); if (!countStatus.isOK()) { return countStatus.getStatus(); } @@ -300,16 +306,16 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC // Draining has already started, now figure out how many chunks and databases are still on the // shard. - countStatus = _runCountCommand( + countStatus = _runCountCommandOnConfig( readHost.getValue(), NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(name))); if (!countStatus.isOK()) { return countStatus.getStatus(); } const long long chunkCount = countStatus.getValue(); - countStatus = _runCountCommand(readHost.getValue(), - NamespaceString(DatabaseType::ConfigNS), - BSON(DatabaseType::primary(name))); + countStatus = _runCountCommandOnConfig(readHost.getValue(), + NamespaceString(DatabaseType::ConfigNS), + BSON(DatabaseType::primary(name))); if (!countStatus.isOK()) { return countStatus.getStatus(); } @@ -452,8 +458,7 @@ void CatalogManagerReplicaSet::logAction(const ActionLogType& actionLog) { if (_actionLogCollectionCreated.load() == 0) { BSONObj createCmd = BSON("create" << ActionLogType::ConfigNS << "capped" << true << "size" << kActionLogCollectionSize); - auto result = - grid.shardRegistry()->runCommandWithNotMasterRetries("config", "config", createCmd); + auto result = _runCommandOnConfigWithNotMasterRetries("config", createCmd); if (!result.isOK()) { LOG(1) << "couldn't create actionlog collection: " << causedBy(result.getStatus()); return; @@ -481,8 +486,7 @@ void CatalogManagerReplicaSet::logChange(const string& clientAddress, if (_changeLogCollectionCreated.load() == 0) { BSONObj createCmd = BSON("create" << ChangeLogType::ConfigNS << "capped" << true << "size" << kChangeLogCollectionSize); - auto result = - grid.shardRegistry()->runCommandWithNotMasterRetries("config", "config", createCmd); + auto result = _runCommandOnConfigWithNotMasterRetries("config", createCmd); if (!result.isOK()) { LOG(1) << "couldn't create changelog collection: " << causedBy(result.getStatus()); return; @@ -735,7 +739,7 @@ bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string& return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); } - auto response = grid.shardRegistry()->runCommandWithNotMasterRetries("config", dbname, cmdObj); + auto response = _runCommandOnConfigWithNotMasterRetries(dbname, cmdObj); if (!response.isOK()) { return Command::appendCommandStatus(*result, response.getStatus()); } @@ -746,26 +750,23 @@ bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string& bool CatalogManagerReplicaSet::runReadCommand(const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { - auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - auto target = targeter->findHost(kConfigReadSelector); - if (!target.isOK()) { - return Command::appendCommandStatus(*result, target.getStatus()); - } + BSONObjBuilder cmdBuilder; + cmdBuilder.appendElements(cmdObj); + _appendReadConcern(&cmdBuilder); - auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); - if (!resultStatus.isOK()) { - return Command::appendCommandStatus(*result, resultStatus.getStatus()); - } - - result->appendElements(resultStatus.getValue()); + return _runReadCommand(dbname, cmdBuilder.done(), kConfigReadSelector, result); +} - return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK(); +bool CatalogManagerReplicaSet::runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + return _runReadCommand(dbname, cmdObj, kConfigPrimaryPreferredSelector, result); } Status CatalogManagerReplicaSet::applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) { BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition); - auto response = grid.shardRegistry()->runCommandWithNotMasterRetries("config", "config", cmd); + auto response = _runCommandOnConfigWithNotMasterRetries("config", cmd); if (!response.isOK()) { return response.getStatus(); @@ -792,7 +793,7 @@ void CatalogManagerReplicaSet::writeConfigServerDirect(const BatchedCommandReque invariant(dbname == "config" || dbname == "admin"); const BSONObj cmdObj = batchRequest.toBSON(); - auto response = grid.shardRegistry()->runCommandWithNotMasterRetries("config", dbname, cmdObj); + auto response = _runCommandOnConfigWithNotMasterRetries(dbname, cmdObj); if (!response.isOK()) { _toBatchError(response.getStatus(), batchResponse); return; @@ -896,11 +897,16 @@ StatusWith<std::string> CatalogManagerReplicaSet::_generateNewShardName() const return Status(ErrorCodes::OperationFailed, "unable to generate new shard name"); } -StatusWith<long long> CatalogManagerReplicaSet::_runCountCommand(const HostAndPort& target, - const NamespaceString& ns, - BSONObj query) { - BSONObj countCmd = BSON("count" << ns.coll() << "query" << query); - auto responseStatus = grid.shardRegistry()->runCommand(target, ns.db().toString(), countCmd); +StatusWith<long long> CatalogManagerReplicaSet::_runCountCommandOnConfig(const HostAndPort& target, + const NamespaceString& ns, + BSONObj query) { + BSONObjBuilder countBuilder; + countBuilder.append("count", ns.coll()); + countBuilder.append("query", query); + _appendReadConcern(&countBuilder); + + auto responseStatus = _runCommandOnConfig(target, ns.db().toString(), countBuilder.done()); + if (!responseStatus.isOK()) { return responseStatus.getStatus(); } @@ -995,8 +1001,7 @@ StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion() { } if (queryResults.empty()) { - auto cmdStatus = - grid.shardRegistry()->runCommand(readHost, "admin", BSON("listDatabases" << 1)); + auto cmdStatus = _runCommandOnConfig(readHost, "admin", BSON("listDatabases" << 1)); if (!cmdStatus.isOK()) { return cmdStatus.getStatus(); } @@ -1036,4 +1041,76 @@ StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion() { return versionTypeResult.getValue(); } +StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfig(const HostAndPort& target, + const string& dbName, + BSONObj cmdObj) { + auto result = + grid.shardRegistry()->runCommandWithMetadata(target, dbName, cmdObj, kReplMetadata); + + if (!result.isOK()) { + return result.getStatus(); + } + + const auto& response = result.getValue(); + + _updateLastSeenConfigOpTime(response.opTime); + + return response.response; +} + +StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfigWithNotMasterRetries( + const std::string& dbName, BSONObj cmdObj) { + auto result = grid.shardRegistry()->runCommandWithNotMasterRetries( + "config", dbName, cmdObj, kReplMetadata); + + if (!result.isOK()) { + return result.getStatus(); + } + + const auto& response = result.getValue(); + + _updateLastSeenConfigOpTime(response.opTime); + + return response.response; +} + +repl::OpTime CatalogManagerReplicaSet::_getConfigOpTime() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _configOpTime; +} + +void CatalogManagerReplicaSet::_updateLastSeenConfigOpTime(const repl::OpTime& optime) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_configOpTime < optime) { + _configOpTime = optime; + } +} + +void CatalogManagerReplicaSet::_appendReadConcern(BSONObjBuilder* builder) { + repl::ReadConcernArgs readConcern(_getConfigOpTime(), + repl::ReadConcernLevel::kMajorityReadConcern); + readConcern.appendInfo(builder); +} + +bool CatalogManagerReplicaSet::_runReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + const ReadPreferenceSetting& settings, + BSONObjBuilder* result) { + auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); + auto target = targeter->findHost(settings); + if (!target.isOK()) { + return Command::appendCommandStatus(*result, target.getStatus()); + } + + auto resultStatus = _runCommandOnConfig(target.getValue(), dbname, cmdObj); + if (!resultStatus.isOK()) { + return Command::appendCommandStatus(*result, resultStatus.getStatus()); + } + + result->appendElements(resultStatus.getValue()); + + return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK(); +} + } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index ef9abf9e734..18c2bec94b1 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/client/connection_string.h" +#include "mongo/db/repl/optime.h" #include "mongo/platform/atomic_word.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/stdx/mutex.h" @@ -36,6 +37,7 @@ namespace mongo { class NamespaceString; +struct ReadPreferenceSetting; class VersionType; /** @@ -102,6 +104,10 @@ public: const BSONObj& cmdObj, BSONObjBuilder* result) override; + bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; + Status applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) override; @@ -126,36 +132,74 @@ private: StatusWith<std::string> _generateNewShardName() const override; + bool _runReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + const ReadPreferenceSetting& settings, + BSONObjBuilder* result); + /** * Helper method for running a count command against a given target server with appropriate * error handling. */ - StatusWith<long long> _runCountCommand(const HostAndPort& target, - const NamespaceString& ns, - BSONObj query); + StatusWith<long long> _runCountCommandOnConfig(const HostAndPort& target, + const NamespaceString& ns, + BSONObj query); + + StatusWith<BSONObj> _runCommandOnConfig(const HostAndPort& target, + const std::string& dbName, + BSONObj cmdObj); + + StatusWith<BSONObj> _runCommandOnConfigWithNotMasterRetries(const std::string& dbName, + BSONObj cmdObj); + + /** + * Appends a read committed read concern to the request object. + */ + void _appendReadConcern(BSONObjBuilder* builder); /** * Returns the current cluster schema/protocol version. */ StatusWith<VersionType> _getConfigVersion(); + /** + * Returns the highest last known config server opTime. + */ + repl::OpTime _getConfigOpTime(); + + /** + * Updates the last known config server opTime if the given opTime is newer. + */ + void _updateLastSeenConfigOpTime(const repl::OpTime& optime); + + // + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (F) Self synchronizing. + // (M) Must hold _mutex for access. + // (R) Read only, can only be written during initialization. + // + + stdx::mutex _mutex; + // Config server connection string - ConnectionString _configServerConnectionString; + ConnectionString _configServerConnectionString; // (R) // Distribted lock manager singleton. - std::unique_ptr<DistLockManager> _distLockManager; + std::unique_ptr<DistLockManager> _distLockManager; // (R) // Whether the logAction call should attempt to create the actionlog collection - AtomicInt32 _actionLogCollectionCreated; + AtomicInt32 _actionLogCollectionCreated; // (F) // Whether the logChange call should attempt to create the changelog collection - AtomicInt32 _changeLogCollectionCreated; - - // protects _inShutdown - stdx::mutex _mutex; + AtomicInt32 _changeLogCollectionCreated; // (F) // True if shutDown() has been called. False, otherwise. - bool _inShutdown = false; + bool _inShutdown = false; // (M) + + // Last known highest opTime from the config server. + repl::OpTime _configOpTime; // (M) }; } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp index 0dbbeaab975..14c5b87da11 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp @@ -78,6 +78,8 @@ protected: ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("isdbgrid" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + BSONObjBuilder responseBuilder; Command::appendCommandStatus( responseBuilder, Status(ErrorCodes::CommandNotFound, "isdbgrid command not found")); @@ -90,6 +92,8 @@ protected: ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("isMaster" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ismaster" << true); }); @@ -98,6 +102,8 @@ protected: ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("replSetGetStatus" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + BSONObjBuilder responseBuilder; Command::appendCommandStatus( responseBuilder, @@ -116,6 +122,8 @@ protected: const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -185,6 +193,8 @@ TEST_F(AddShardTest, AddShardStandalone) { ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("listDatabases" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + BSONArrayBuilder arr; arr.append(BSON("name" @@ -290,6 +300,8 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) { ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("listDatabases" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + BSONArrayBuilder arr; arr.append(BSON("name" diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp index 0a053edde55..d168b62edd0 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp @@ -92,15 +92,20 @@ public: ASSERT_EQ(_dropNS.db(), request.dbname); ASSERT_EQ(BSON("drop" << _dropNS.coll()), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ns" << _dropNS.ns() << "ok" << 1); }); } void expectRemoveChunksAndMarkCollectionDropped() { onCommand([this](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); ASSERT_EQ(_configHost, request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObj expectedCmd(fromjson(R"({ delete: "chunks", deletes: [{ q: { ns: "test.user" }, limit: 0 }], @@ -132,6 +137,8 @@ public: ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("unsetSharding" << 1), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("n" << 1 << "ok" << 1); }); } @@ -219,6 +226,8 @@ TEST_F(DropColl2ShardTest, NSNotFound) { ASSERT_EQ(dropNS().db(), request.dbname); ASSERT_EQ(BSON("drop" << dropNS().coll()), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 0 << "code" << ErrorCodes::NamespaceNotFound); }); @@ -227,6 +236,8 @@ TEST_F(DropColl2ShardTest, NSNotFound) { ASSERT_EQ(dropNS().db(), request.dbname); ASSERT_EQ(BSON("drop" << dropNS().coll()), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 0 << "code" << ErrorCodes::NamespaceNotFound); }); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp index f0cf6218fa8..cbb0ded71d6 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp @@ -60,6 +60,9 @@ public: void expectActionLogCreate(const BSONObj& response) { onCommand([&response](const RemoteCommandRequest& request) { ASSERT_EQUALS("config", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObj expectedCreateCmd = BSON("create" << ActionLogType::ConfigNS << "capped" << true << "size" << 1024 * 1024 * 2); ASSERT_EQUALS(expectedCreateCmd, request.cmdObj); @@ -72,6 +75,8 @@ public: onCommand([&expectedActionLog](const RemoteCommandRequest& request) { ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedInsertRequest actualBatchedInsert; std::string errmsg; ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp index 970fadff866..dc12eeb5a7d 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp @@ -146,6 +146,8 @@ TEST_F(RemoveShardTest, RemoveShardStartDraining) { ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -318,6 +320,8 @@ TEST_F(RemoveShardTest, RemoveShardCompletion) { ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedDeleteRequest actualBatchedDelete; std::string errmsg; ASSERT_TRUE(actualBatchedDelete.parseBSON(request.dbname, request.cmdObj, &errmsg)); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp index 6fc42a28dd8..3c0a59a68ec 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp @@ -105,6 +105,8 @@ public: ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -173,6 +175,8 @@ public: ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -729,6 +733,8 @@ TEST_F(ShardCollectionTest, withInitialData) { ASSERT_EQUALS(0, request.cmdObj["maxSplitPoints"].numberLong()); ASSERT_EQUALS(0, request.cmdObj["maxChunkObjects"].numberLong()); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "splitKeys" << BSON_ARRAY(splitPoint0 << splitPoint1 << splitPoint2 << splitPoint3)); }); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp index e08996461f5..8d295a29e6c 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp @@ -38,6 +38,7 @@ #include "mongo/db/query/lite_parsed_query.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h" @@ -64,6 +65,7 @@ using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using executor::TaskExecutor; +using rpc::ReplSetMetadata; using std::string; using std::vector; using stdx::chrono::milliseconds; @@ -482,8 +484,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommand) { auto future = launchAsync([this] { BSONObjBuilder responseBuilder; - bool ok = - catalogManager()->runReadCommand("test", BSON("usersInfo" << 1), &responseBuilder); + bool ok = catalogManager()->runUserManagementReadCommand( + "test", BSON("usersInfo" << 1), &responseBuilder); ASSERT_TRUE(ok); BSONObj response = responseBuilder.obj(); @@ -493,6 +495,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommand) { }); onCommand([](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + ASSERT_EQUALS("test", request.dbname); ASSERT_EQUALS(BSON("usersInfo" << 1), request.cmdObj); @@ -508,7 +512,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommandUnsatisfiedReadPre Status(ErrorCodes::FailedToSatisfyReadPreference, "no nodes up")); BSONObjBuilder responseBuilder; - bool ok = catalogManager()->runReadCommand("test", BSON("usersInfo" << 1), &responseBuilder); + bool ok = catalogManager()->runUserManagementReadCommand( + "test", BSON("usersInfo" << 1), &responseBuilder); ASSERT_FALSE(ok); Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); @@ -571,6 +576,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandSuccess) { << "test"), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObjBuilder responseBuilder; Command::appendCommandStatus(responseBuilder, Status(ErrorCodes::UserNotFound, "User test@test not found")); @@ -669,6 +676,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandNotMasterRetrySuc << "test"), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return BSON("ok" << 1); }); @@ -1212,6 +1221,8 @@ TEST_F(CatalogManagerReplSetTest, UpdateDatabase) { onCommand([dbt](const RemoteCommandRequest& request) { ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -1284,6 +1295,9 @@ TEST_F(CatalogManagerReplSetTest, ApplyChunkOpsDeprecated) { onCommand([updateOps, preCondition](const RemoteCommandRequest& request) { ASSERT_EQUALS("config", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + ASSERT_EQUALS(updateOps, request.cmdObj["applyOps"].Obj()); ASSERT_EQUALS(preCondition, request.cmdObj["preCondition"].Obj()); @@ -1316,6 +1330,8 @@ TEST_F(CatalogManagerReplSetTest, ApplyChunkOpsDeprecatedCommandFailed) { ASSERT_EQUALS(updateOps, request.cmdObj["applyOps"].Obj()); ASSERT_EQUALS(preCondition, request.cmdObj["preCondition"].Obj()); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObjBuilder responseBuilder; Command::appendCommandStatus(responseBuilder, Status(ErrorCodes::BadValue, "precondition failed")); @@ -1399,6 +1415,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 10); }); @@ -1409,6 +1427,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 1); }); @@ -1419,6 +1439,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 100); }); @@ -1427,6 +1449,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedInsertRequest actualBatchedInsert; std::string errmsg; ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -1657,6 +1681,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 10); }); @@ -1667,6 +1693,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 1); }); @@ -1677,6 +1705,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 100); }); @@ -1685,6 +1715,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedInsertRequest actualBatchedInsert; std::string errmsg; ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -1765,6 +1797,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) { ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return fromjson(R"({ databases: [], totalSize: 1, @@ -1776,6 +1810,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) { ASSERT_EQ(HostAndPort("config:123"), request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObj expectedCmd(fromjson(R"({ update: "databases", updates: [{ @@ -1871,6 +1907,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingDBExists) { ASSERT_EQ(HostAndPort("config:123"), request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObj expectedCmd(fromjson(R"({ update: "databases", updates: [{ @@ -1948,5 +1986,128 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExistsNoShards) { future.timed_get(kFutureTimeout); } +TEST_F(CatalogManagerReplSetTest, BasicReadAfterOpTime) { + configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1")); + + repl::OpTime lastOpTime; + for (int x = 0; x < 3; x++) { + auto future = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE( + catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + const repl::OpTime newOpTime(Timestamp(x + 2, x + 6), x + 5); + + onCommandWithMetadata([this, &newOpTime, &lastOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, lastOpTime.getTimestamp(), lastOpTime.getTerm()); + + ReplSetMetadata metadata(12, newOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1)); + }); + + // Now wait for the runReadCommand call to return + future.timed_get(kFutureTimeout); + + lastOpTime = newOpTime; + } +} + +TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { + configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1")); + + // Initialize the internal config OpTime + auto future1 = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + repl::OpTime highestOpTime; + const repl::OpTime newOpTime(Timestamp(7, 6), 5); + + onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + ReplSetMetadata metadata(12, newOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1)); + }); + + future1.timed_get(kFutureTimeout); + + highestOpTime = newOpTime; + + // Return an older OpTime + auto future2 = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + const repl::OpTime oldOpTime(Timestamp(3, 10), 5); + + onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + ReplSetMetadata metadata(12, oldOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1)); + }); + + future2.timed_get(kFutureTimeout); + + // Check that older OpTime does not override highest OpTime + auto future3 = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + ReplSetMetadata metadata(12, oldOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1)); + }); + + future3.timed_get(kFutureTimeout); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp index 767555bcc13..3b0036418a5 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp @@ -40,6 +40,7 @@ #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/lite_parsed_query.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" @@ -179,6 +180,11 @@ void CatalogManagerReplSetTestFixture::onCommand(NetworkTestEnv::OnCommandFuncti _networkTestEnv->onCommand(func); } +void CatalogManagerReplSetTestFixture::onCommandWithMetadata( + NetworkTestEnv::OnCommandWithMetadataFunction func) { + _networkTestEnv->onCommandWithMetadata(func); +} + void CatalogManagerReplSetTestFixture::onFindCommand(NetworkTestEnv::OnFindCommandFunction func) { _networkTestEnv->onFindCommand(func); } @@ -312,6 +318,7 @@ void CatalogManagerReplSetTestFixture::expectUpdateCollection(const HostAndPort& const CollectionType& coll) { onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(expectedHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); ASSERT_EQUALS("config", request.dbname); BatchedUpdateRequest actualBatchedUpdate; @@ -342,6 +349,7 @@ void CatalogManagerReplSetTestFixture::expectSetShardVersion( const ChunkVersion& expectedChunkVersion) { onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(expectedHost, request.target); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); SetShardVersionRequest ssv = assertGet(SetShardVersionRequest::parseFromBSON(request.cmdObj)); @@ -379,10 +387,32 @@ void CatalogManagerReplSetTestFixture::expectCount(const HostAndPort& configHost return BSON("ok" << 1 << "n" << response.getValue()); } + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + BSONObjBuilder responseBuilder; Command::appendCommandStatus(responseBuilder, response.getStatus()); return responseBuilder.obj(); }); } +void CatalogManagerReplSetTestFixture::checkReadConcern(const BSONObj& cmdObj, + const Timestamp& expectedTS, + long long expectedTerm) const { + auto readConcernElem = cmdObj[repl::ReadConcernArgs::kReadConcernFieldName]; + ASSERT_EQ(Object, readConcernElem.type()); + + auto readConcernObj = readConcernElem.Obj(); + ASSERT_EQ("majority", readConcernObj[repl::ReadConcernArgs::kLevelFieldName].str()); + + auto afterElem = readConcernObj[repl::ReadConcernArgs::kOpTimeFieldName]; + ASSERT_EQ(Object, afterElem.type()); + + auto afterObj = afterElem.Obj(); + + ASSERT_TRUE(afterObj.hasField(repl::ReadConcernArgs::kOpTimestampFieldName)); + ASSERT_EQ(expectedTS, afterObj[repl::ReadConcernArgs::kOpTimestampFieldName].timestamp()); + ASSERT_TRUE(afterObj.hasField(repl::ReadConcernArgs::kOpTermFieldName)); + ASSERT_EQ(expectedTerm, afterObj[repl::ReadConcernArgs::kOpTermFieldName].numberLong()); +} + } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h index 5a1fce08408..67d8d678637 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h @@ -94,6 +94,7 @@ protected: * single request + response or find tests. */ void onCommand(executor::NetworkTestEnv::OnCommandFunction func); + void onCommandWithMetadata(executor::NetworkTestEnv::OnCommandWithMetadataFunction func); void onFindCommand(executor::NetworkTestEnv::OnFindCommandFunction func); /** @@ -157,6 +158,13 @@ protected: void shutdownExecutor(); + /** + * Checks that the given command has the expected settings for read after opTime. + */ + void checkReadConcern(const BSONObj& cmdObj, + const Timestamp& expectedTS, + long long expectedTerm) const; + private: std::unique_ptr<ServiceContext> _service; ServiceContext::UniqueClient _client; diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp index 51afc7b40b3..c20e98a4ef9 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp @@ -151,6 +151,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfig) { ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return fromjson(R"({ databases: [ { name: "local" } @@ -164,6 +166,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfig) { ASSERT_EQ(HostAndPort("config:123"), request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -210,6 +214,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfigWithAdmin ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return fromjson(R"({ databases: [ { name: "local" }, @@ -224,6 +230,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfigWithAdmin ASSERT_EQ(HostAndPort("config:123"), request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -255,6 +263,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeWriteError) { ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return fromjson(R"({ databases: [ { name: "local" } @@ -296,6 +306,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocNonEmptyConfigServer ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return fromjson(R"({ databases: [ { name: "local" }, |