diff options
author | Randolph Tan <randolph@10gen.com> | 2015-07-29 17:24:35 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2015-08-06 09:58:01 -0400 |
commit | 1cfa49411655640a5b1d2da60573c3f01d3c7c38 (patch) | |
tree | ad5b8f6c570905441a51fb283910125f6ebfae65 | |
parent | 1eee2b3a8c079c15ddc79e03e1b1d16b37d427d2 (diff) | |
download | mongo-1cfa49411655640a5b1d2da60573c3f01d3c7c38.tar.gz |
SERVER-19390 Make config server read commands do read committed
28 files changed, 572 insertions, 79 deletions
diff --git a/buildscripts/resmokelib/core/programs.py b/buildscripts/resmokelib/core/programs.py index cce38bcd054..929157dbf9e 100644 --- a/buildscripts/resmokelib/core/programs.py +++ b/buildscripts/resmokelib/core/programs.py @@ -58,6 +58,11 @@ def mongod_program(logger, executable=None, process_kwargs=None, **kwargs): else: kwargs[opt_name] = shortcut_opts[opt_name] + # Override the storage engine specified on the command line with "wiredTiger" if running a + # config server replica set. + if "replSet" in kwargs and "configsvr" in kwargs: + kwargs["storageEngine"] = "wiredTiger" + # Apply the rest of the command line arguments. _apply_kwargs(args, kwargs) diff --git a/buildscripts/resmokelib/testing/fixtures/shardedcluster.py b/buildscripts/resmokelib/testing/fixtures/shardedcluster.py index 5224b67cb6f..c70eb81b596 100644 --- a/buildscripts/resmokelib/testing/fixtures/shardedcluster.py +++ b/buildscripts/resmokelib/testing/fixtures/shardedcluster.py @@ -4,6 +4,7 @@ Sharded cluster fixture for executing JSTests against. from __future__ import absolute_import +import copy import os.path import time @@ -171,10 +172,11 @@ class ShardedClusterFixture(interface.Fixture): logger_name = "%s:configsvr" % (self.logger.name) mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger) - mongod_options = self.mongod_options.copy() + mongod_options = copy.deepcopy(self.mongod_options) mongod_options["configsvr"] = "" mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "config") mongod_options["replSet"] = ShardedClusterFixture._CONFIGSVR_REPLSET_NAME + mongod_options["set_parameters"]["enableReplSnapshotThread"] = 1 return replicaset.ReplicaSetFixture(mongod_logger, self.job_num, @@ -193,7 +195,7 @@ class ShardedClusterFixture(interface.Fixture): logger_name = "%s:shard%d" % (self.logger.name, index) mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger) - mongod_options = self.mongod_options.copy() + mongod_options = copy.deepcopy(self.mongod_options) mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "shard%d" % (index)) return standalone.MongoDFixture(mongod_logger, @@ -211,7 +213,7 @@ class ShardedClusterFixture(interface.Fixture): logger_name = "%s:mongos" % (self.logger.name) mongos_logger = logging.loggers.new_logger(logger_name, parent=self.logger) - mongos_options = self.mongos_options.copy() + mongos_options = copy.deepcopy(self.mongos_options) if self.separate_configsvr: configdb_replset = ShardedClusterFixture._CONFIGSVR_REPLSET_NAME configdb_port = self.configsvr.port diff --git a/src/mongo/db/auth/authz_manager_external_state_s.cpp b/src/mongo/db/auth/authz_manager_external_state_s.cpp index bb62e56168c..b71cf0cf08c 100644 --- a/src/mongo/db/auth/authz_manager_external_state_s.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_s.cpp @@ -69,7 +69,8 @@ Status AuthzManagerExternalStateMongos::getStoredAuthorizationVersion(OperationC // that runs this command BSONObj getParameterCmd = BSON("getParameter" << 1 << authSchemaVersionServerParameter << 1); BSONObjBuilder builder; - const bool ok = grid.catalogManager()->runReadCommand("admin", getParameterCmd, &builder); + const bool ok = + grid.catalogManager()->runUserManagementReadCommand("admin", getParameterCmd, &builder); BSONObj cmdResult = builder.obj(); if (!ok) { return Command::getStatusFromCommandResult(cmdResult); @@ -94,7 +95,8 @@ Status AuthzManagerExternalStateMongos::getUserDescription(OperationContext* txn << userName.getDB())) << "showPrivileges" << true << "showCredentials" << true); BSONObjBuilder builder; - const bool ok = grid.catalogManager()->runReadCommand("admin", usersInfoCmd, &builder); + const bool ok = + grid.catalogManager()->runUserManagementReadCommand("admin", usersInfoCmd, &builder); BSONObj cmdResult = builder.obj(); if (!ok) { return Command::getStatusFromCommandResult(cmdResult); @@ -123,7 +125,8 @@ Status AuthzManagerExternalStateMongos::getRoleDescription(const RoleName& roleN << roleName.getRole() << AuthorizationManager::ROLE_DB_FIELD_NAME << roleName.getDB())) << "showPrivileges" << showPrivileges); BSONObjBuilder builder; - const bool ok = grid.catalogManager()->runReadCommand("admin", rolesInfoCmd, &builder); + const bool ok = + grid.catalogManager()->runUserManagementReadCommand("admin", rolesInfoCmd, &builder); BSONObj cmdResult = builder.obj(); if (!ok) { return Command::getStatusFromCommandResult(cmdResult); @@ -150,7 +153,8 @@ Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(const std::stri BSONObj rolesInfoCmd = BSON("rolesInfo" << 1 << "showPrivileges" << showPrivileges << "showBuiltinRoles" << showBuiltinRoles); BSONObjBuilder builder; - const bool ok = grid.catalogManager()->runReadCommand(dbname, rolesInfoCmd, &builder); + const bool ok = + grid.catalogManager()->runUserManagementReadCommand(dbname, rolesInfoCmd, &builder); BSONObj cmdResult = builder.obj(); if (!ok) { return Command::getStatusFromCommandResult(cmdResult); @@ -164,7 +168,8 @@ Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(const std::stri bool AuthzManagerExternalStateMongos::hasAnyPrivilegeDocuments(OperationContext* txn) { BSONObj usersInfoCmd = BSON("usersInfo" << 1); BSONObjBuilder builder; - const bool ok = grid.catalogManager()->runReadCommand("admin", usersInfoCmd, &builder); + const bool ok = + grid.catalogManager()->runUserManagementReadCommand("admin", usersInfoCmd, &builder); if (!ok) { // If we were unable to complete the query, // it's best to assume that there _are_ privilege documents. This might happen diff --git a/src/mongo/db/auth/user_cache_invalidator_job.cpp b/src/mongo/db/auth/user_cache_invalidator_job.cpp index 2e44a9ff65a..dd44b200f60 100644 --- a/src/mongo/db/auth/user_cache_invalidator_job.cpp +++ b/src/mongo/db/auth/user_cache_invalidator_job.cpp @@ -91,7 +91,7 @@ public: StatusWith<OID> getCurrentCacheGeneration() { try { BSONObjBuilder result; - const bool ok = grid.catalogManager()->runReadCommand( + const bool ok = grid.catalogManager()->runUserManagementReadCommand( "admin", BSON("_getUserCacheGeneration" << 1), &result); if (!ok) { return Command::getStatusFromCommandResult(result.obj()); diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp index c65d1adcf2a..64241e72c12 100644 --- a/src/mongo/executor/network_test_env.cpp +++ b/src/mongo/executor/network_test_env.cpp @@ -64,6 +64,17 @@ void NetworkTestEnv::onCommand(OnCommandFunction func) { _mockNetwork->exitNetwork(); } +void NetworkTestEnv::onCommandWithMetadata(OnCommandWithMetadataFunction func) { + _mockNetwork->enterNetwork(); + + const NetworkInterfaceMock::NetworkOperationIterator noi = _mockNetwork->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), func(request)); + _mockNetwork->runReadyNetworkOperations(); + + _mockNetwork->exitNetwork(); +} + void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) { onCommand([&func](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { const auto& resultStatus = func(request); diff --git a/src/mongo/executor/network_test_env.h b/src/mongo/executor/network_test_env.h index b68b15a5a7f..6ef0f242597 100644 --- a/src/mongo/executor/network_test_env.h +++ b/src/mongo/executor/network_test_env.h @@ -129,8 +129,9 @@ public: std::move(future), _executor, _mockNetwork}; } - using OnCommandFunction = stdx::function<StatusWith<BSONObj>(const RemoteCommandRequest&)>; + using OnCommandWithMetadataFunction = + stdx::function<StatusWith<RemoteCommandResponse>(const RemoteCommandRequest&)>; using OnFindCommandFunction = stdx::function<StatusWith<std::vector<BSONObj>>(const RemoteCommandRequest&)>; @@ -146,6 +147,7 @@ public: * single request + response or find tests. */ void onCommand(OnCommandFunction func); + void onCommandWithMetadata(OnCommandWithMetadataFunction func); void onFindCommand(OnFindCommandFunction func); private: diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index 2f3ee76fcbf..366e6c63644 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -274,13 +274,20 @@ public: BSONObjBuilder* result) = 0; /** - * Runs a read-only command on a single config server. + * Runs a read-only command on a config server. */ virtual bool runReadCommand(const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) = 0; /** + * Runs a user management related read-only command on a config server. + */ + virtual bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) = 0; + + /** * Applies oplog entries to the config servers. * Used by mergeChunk, splitChunk, and moveChunk commands. * diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp index 7e4275409f1..ab2cf24bfc9 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/catalog_manager_mock.cpp @@ -133,12 +133,18 @@ bool CatalogManagerMock::runUserManagementWriteCommand(const string& commandName return true; } -bool CatalogManagerMock::runReadCommand(const string& dbname, +bool CatalogManagerMock::runReadCommand(const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { return true; } +bool CatalogManagerMock::runUserManagementReadCommand(const string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + return true; +} + Status CatalogManagerMock::applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) { return Status::OK(); diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h index 90e25993fcb..83bcab351e2 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.h +++ b/src/mongo/s/catalog/catalog_manager_mock.h @@ -94,9 +94,13 @@ public: const BSONObj& cmdObj, BSONObjBuilder* result) override; - bool runReadCommand(const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder* result) override; + virtual bool runReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; + + bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; Status applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) override; diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 4996af0c543..24b710ec14b 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -879,7 +879,13 @@ bool CatalogManagerLegacy::runUserManagementWriteCommand(const string& commandNa return Command::appendCommandStatus(*result, status); } -bool CatalogManagerLegacy::runReadCommand(const string& dbname, +bool CatalogManagerLegacy::runUserManagementReadCommand(const string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + return runReadCommand(dbname, cmdObj, result); +} + +bool CatalogManagerLegacy::runReadCommand(const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { try { diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index 7e202d8aa2c..068b8555d91 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -101,6 +101,10 @@ public: const BSONObj& cmdObj, BSONObjBuilder* result) override; + bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; + Status applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) override; diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript index 2e7d7d72f4a..2cae356706a 100644 --- a/src/mongo/s/catalog/replset/SConscript +++ b/src/mongo/s/catalog/replset/SConscript @@ -35,6 +35,7 @@ env.Library( 'catalog_manager_replica_set.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/repl/read_concern_args', '$BUILD_DIR/mongo/s/catalog/catalog_manager', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager', '$BUILD_DIR/mongo/s/client/sharding_client', @@ -58,6 +59,7 @@ env.CppUnitTest( 'catalog_manager_replica_set', '$BUILD_DIR/mongo/client/remote_command_targeter_mock', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/rpc/metadata', '$BUILD_DIR/mongo/executor/network_test_env', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 6296253463a..1590821c415 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -45,8 +45,10 @@ #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/network_interface.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/type_actionlog.h" @@ -86,8 +88,11 @@ namespace { // Until read committed is supported always write to the primary with majority write and read // from the secondary. That way we ensure that reads will see a consistent data. -const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryPreferred, TagSet{}); - +// TODO: switch back to SecondaryPreferred once SERVER-19675 is fixed +const ReadPreferenceSetting kConfigReadSelector(ReadPreference::PrimaryOnly, TagSet{}); +const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred, + TagSet{}); +const BSONObj kReplMetadata(BSON(rpc::kReplicationMetadataFieldName << 1)); const int kInitialSSVRetries = 3; const int kActionLogCollectionSize = 1024 * 1024 * 2; const int kChangeLogCollectionSize = 1024 * 1024 * 10; @@ -169,7 +174,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, return readHost.getStatus(); } - auto countStatus = _runCountCommand( + auto countStatus = _runCountCommandOnConfig( readHost.getValue(), NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::ns(ns))); if (!countStatus.isOK()) { return countStatus.getStatus(); @@ -247,10 +252,10 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC } // Check preconditions for removing the shard - auto countStatus = - _runCountCommand(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSON(ShardType::name() << NE << name << ShardType::draining(true))); + auto countStatus = _runCountCommandOnConfig( + readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSON(ShardType::name() << NE << name << ShardType::draining(true))); if (!countStatus.isOK()) { return countStatus.getStatus(); } @@ -259,9 +264,9 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC "Can't have more than one draining shard at a time"); } - countStatus = _runCountCommand(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSON(ShardType::name() << NE << name)); + countStatus = _runCountCommandOnConfig(readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSON(ShardType::name() << NE << name)); if (!countStatus.isOK()) { return countStatus.getStatus(); } @@ -270,9 +275,10 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC } // Figure out if shard is already draining - countStatus = _runCountCommand(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSON(ShardType::name() << name << ShardType::draining(true))); + countStatus = + _runCountCommandOnConfig(readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSON(ShardType::name() << name << ShardType::draining(true))); if (!countStatus.isOK()) { return countStatus.getStatus(); } @@ -300,16 +306,16 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC // Draining has already started, now figure out how many chunks and databases are still on the // shard. - countStatus = _runCountCommand( + countStatus = _runCountCommandOnConfig( readHost.getValue(), NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(name))); if (!countStatus.isOK()) { return countStatus.getStatus(); } const long long chunkCount = countStatus.getValue(); - countStatus = _runCountCommand(readHost.getValue(), - NamespaceString(DatabaseType::ConfigNS), - BSON(DatabaseType::primary(name))); + countStatus = _runCountCommandOnConfig(readHost.getValue(), + NamespaceString(DatabaseType::ConfigNS), + BSON(DatabaseType::primary(name))); if (!countStatus.isOK()) { return countStatus.getStatus(); } @@ -452,8 +458,7 @@ void CatalogManagerReplicaSet::logAction(const ActionLogType& actionLog) { if (_actionLogCollectionCreated.load() == 0) { BSONObj createCmd = BSON("create" << ActionLogType::ConfigNS << "capped" << true << "size" << kActionLogCollectionSize); - auto result = - grid.shardRegistry()->runCommandWithNotMasterRetries("config", "config", createCmd); + auto result = _runCommandOnConfigWithNotMasterRetries("config", createCmd); if (!result.isOK()) { LOG(1) << "couldn't create actionlog collection: " << causedBy(result.getStatus()); return; @@ -481,8 +486,7 @@ void CatalogManagerReplicaSet::logChange(const string& clientAddress, if (_changeLogCollectionCreated.load() == 0) { BSONObj createCmd = BSON("create" << ChangeLogType::ConfigNS << "capped" << true << "size" << kChangeLogCollectionSize); - auto result = - grid.shardRegistry()->runCommandWithNotMasterRetries("config", "config", createCmd); + auto result = _runCommandOnConfigWithNotMasterRetries("config", createCmd); if (!result.isOK()) { LOG(1) << "couldn't create changelog collection: " << causedBy(result.getStatus()); return; @@ -735,7 +739,7 @@ bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string& return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); } - auto response = grid.shardRegistry()->runCommandWithNotMasterRetries("config", dbname, cmdObj); + auto response = _runCommandOnConfigWithNotMasterRetries(dbname, cmdObj); if (!response.isOK()) { return Command::appendCommandStatus(*result, response.getStatus()); } @@ -746,26 +750,23 @@ bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string& bool CatalogManagerReplicaSet::runReadCommand(const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { - auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); - auto target = targeter->findHost(kConfigReadSelector); - if (!target.isOK()) { - return Command::appendCommandStatus(*result, target.getStatus()); - } + BSONObjBuilder cmdBuilder; + cmdBuilder.appendElements(cmdObj); + _appendReadConcern(&cmdBuilder); - auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); - if (!resultStatus.isOK()) { - return Command::appendCommandStatus(*result, resultStatus.getStatus()); - } - - result->appendElements(resultStatus.getValue()); + return _runReadCommand(dbname, cmdBuilder.done(), kConfigReadSelector, result); +} - return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK(); +bool CatalogManagerReplicaSet::runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + return _runReadCommand(dbname, cmdObj, kConfigPrimaryPreferredSelector, result); } Status CatalogManagerReplicaSet::applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) { BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition); - auto response = grid.shardRegistry()->runCommandWithNotMasterRetries("config", "config", cmd); + auto response = _runCommandOnConfigWithNotMasterRetries("config", cmd); if (!response.isOK()) { return response.getStatus(); @@ -792,7 +793,7 @@ void CatalogManagerReplicaSet::writeConfigServerDirect(const BatchedCommandReque invariant(dbname == "config" || dbname == "admin"); const BSONObj cmdObj = batchRequest.toBSON(); - auto response = grid.shardRegistry()->runCommandWithNotMasterRetries("config", dbname, cmdObj); + auto response = _runCommandOnConfigWithNotMasterRetries(dbname, cmdObj); if (!response.isOK()) { _toBatchError(response.getStatus(), batchResponse); return; @@ -896,11 +897,16 @@ StatusWith<std::string> CatalogManagerReplicaSet::_generateNewShardName() const return Status(ErrorCodes::OperationFailed, "unable to generate new shard name"); } -StatusWith<long long> CatalogManagerReplicaSet::_runCountCommand(const HostAndPort& target, - const NamespaceString& ns, - BSONObj query) { - BSONObj countCmd = BSON("count" << ns.coll() << "query" << query); - auto responseStatus = grid.shardRegistry()->runCommand(target, ns.db().toString(), countCmd); +StatusWith<long long> CatalogManagerReplicaSet::_runCountCommandOnConfig(const HostAndPort& target, + const NamespaceString& ns, + BSONObj query) { + BSONObjBuilder countBuilder; + countBuilder.append("count", ns.coll()); + countBuilder.append("query", query); + _appendReadConcern(&countBuilder); + + auto responseStatus = _runCommandOnConfig(target, ns.db().toString(), countBuilder.done()); + if (!responseStatus.isOK()) { return responseStatus.getStatus(); } @@ -995,8 +1001,7 @@ StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion() { } if (queryResults.empty()) { - auto cmdStatus = - grid.shardRegistry()->runCommand(readHost, "admin", BSON("listDatabases" << 1)); + auto cmdStatus = _runCommandOnConfig(readHost, "admin", BSON("listDatabases" << 1)); if (!cmdStatus.isOK()) { return cmdStatus.getStatus(); } @@ -1036,4 +1041,76 @@ StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion() { return versionTypeResult.getValue(); } +StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfig(const HostAndPort& target, + const string& dbName, + BSONObj cmdObj) { + auto result = + grid.shardRegistry()->runCommandWithMetadata(target, dbName, cmdObj, kReplMetadata); + + if (!result.isOK()) { + return result.getStatus(); + } + + const auto& response = result.getValue(); + + _updateLastSeenConfigOpTime(response.opTime); + + return response.response; +} + +StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfigWithNotMasterRetries( + const std::string& dbName, BSONObj cmdObj) { + auto result = grid.shardRegistry()->runCommandWithNotMasterRetries( + "config", dbName, cmdObj, kReplMetadata); + + if (!result.isOK()) { + return result.getStatus(); + } + + const auto& response = result.getValue(); + + _updateLastSeenConfigOpTime(response.opTime); + + return response.response; +} + +repl::OpTime CatalogManagerReplicaSet::_getConfigOpTime() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _configOpTime; +} + +void CatalogManagerReplicaSet::_updateLastSeenConfigOpTime(const repl::OpTime& optime) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_configOpTime < optime) { + _configOpTime = optime; + } +} + +void CatalogManagerReplicaSet::_appendReadConcern(BSONObjBuilder* builder) { + repl::ReadConcernArgs readConcern(_getConfigOpTime(), + repl::ReadConcernLevel::kMajorityReadConcern); + readConcern.appendInfo(builder); +} + +bool CatalogManagerReplicaSet::_runReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + const ReadPreferenceSetting& settings, + BSONObjBuilder* result) { + auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); + auto target = targeter->findHost(settings); + if (!target.isOK()) { + return Command::appendCommandStatus(*result, target.getStatus()); + } + + auto resultStatus = _runCommandOnConfig(target.getValue(), dbname, cmdObj); + if (!resultStatus.isOK()) { + return Command::appendCommandStatus(*result, resultStatus.getStatus()); + } + + result->appendElements(resultStatus.getValue()); + + return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK(); +} + } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index ef9abf9e734..18c2bec94b1 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/client/connection_string.h" +#include "mongo/db/repl/optime.h" #include "mongo/platform/atomic_word.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/stdx/mutex.h" @@ -36,6 +37,7 @@ namespace mongo { class NamespaceString; +struct ReadPreferenceSetting; class VersionType; /** @@ -102,6 +104,10 @@ public: const BSONObj& cmdObj, BSONObjBuilder* result) override; + bool runUserManagementReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) override; + Status applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) override; @@ -126,36 +132,74 @@ private: StatusWith<std::string> _generateNewShardName() const override; + bool _runReadCommand(const std::string& dbname, + const BSONObj& cmdObj, + const ReadPreferenceSetting& settings, + BSONObjBuilder* result); + /** * Helper method for running a count command against a given target server with appropriate * error handling. */ - StatusWith<long long> _runCountCommand(const HostAndPort& target, - const NamespaceString& ns, - BSONObj query); + StatusWith<long long> _runCountCommandOnConfig(const HostAndPort& target, + const NamespaceString& ns, + BSONObj query); + + StatusWith<BSONObj> _runCommandOnConfig(const HostAndPort& target, + const std::string& dbName, + BSONObj cmdObj); + + StatusWith<BSONObj> _runCommandOnConfigWithNotMasterRetries(const std::string& dbName, + BSONObj cmdObj); + + /** + * Appends a read committed read concern to the request object. + */ + void _appendReadConcern(BSONObjBuilder* builder); /** * Returns the current cluster schema/protocol version. */ StatusWith<VersionType> _getConfigVersion(); + /** + * Returns the highest last known config server opTime. + */ + repl::OpTime _getConfigOpTime(); + + /** + * Updates the last known config server opTime if the given opTime is newer. + */ + void _updateLastSeenConfigOpTime(const repl::OpTime& optime); + + // + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (F) Self synchronizing. + // (M) Must hold _mutex for access. + // (R) Read only, can only be written during initialization. + // + + stdx::mutex _mutex; + // Config server connection string - ConnectionString _configServerConnectionString; + ConnectionString _configServerConnectionString; // (R) // Distribted lock manager singleton. - std::unique_ptr<DistLockManager> _distLockManager; + std::unique_ptr<DistLockManager> _distLockManager; // (R) // Whether the logAction call should attempt to create the actionlog collection - AtomicInt32 _actionLogCollectionCreated; + AtomicInt32 _actionLogCollectionCreated; // (F) // Whether the logChange call should attempt to create the changelog collection - AtomicInt32 _changeLogCollectionCreated; - - // protects _inShutdown - stdx::mutex _mutex; + AtomicInt32 _changeLogCollectionCreated; // (F) // True if shutDown() has been called. False, otherwise. - bool _inShutdown = false; + bool _inShutdown = false; // (M) + + // Last known highest opTime from the config server. + repl::OpTime _configOpTime; // (M) }; } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp index 0dbbeaab975..14c5b87da11 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp @@ -78,6 +78,8 @@ protected: ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("isdbgrid" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + BSONObjBuilder responseBuilder; Command::appendCommandStatus( responseBuilder, Status(ErrorCodes::CommandNotFound, "isdbgrid command not found")); @@ -90,6 +92,8 @@ protected: ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("isMaster" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ismaster" << true); }); @@ -98,6 +102,8 @@ protected: ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("replSetGetStatus" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + BSONObjBuilder responseBuilder; Command::appendCommandStatus( responseBuilder, @@ -116,6 +122,8 @@ protected: const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -185,6 +193,8 @@ TEST_F(AddShardTest, AddShardStandalone) { ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("listDatabases" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + BSONArrayBuilder arr; arr.append(BSON("name" @@ -290,6 +300,8 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) { ASSERT_EQ(request.dbname, "admin"); ASSERT_EQ(request.cmdObj, BSON("listDatabases" << 1)); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + BSONArrayBuilder arr; arr.append(BSON("name" diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp index 0a053edde55..d168b62edd0 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp @@ -92,15 +92,20 @@ public: ASSERT_EQ(_dropNS.db(), request.dbname); ASSERT_EQ(BSON("drop" << _dropNS.coll()), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ns" << _dropNS.ns() << "ok" << 1); }); } void expectRemoveChunksAndMarkCollectionDropped() { onCommand([this](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); ASSERT_EQ(_configHost, request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObj expectedCmd(fromjson(R"({ delete: "chunks", deletes: [{ q: { ns: "test.user" }, limit: 0 }], @@ -132,6 +137,8 @@ public: ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("unsetSharding" << 1), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("n" << 1 << "ok" << 1); }); } @@ -219,6 +226,8 @@ TEST_F(DropColl2ShardTest, NSNotFound) { ASSERT_EQ(dropNS().db(), request.dbname); ASSERT_EQ(BSON("drop" << dropNS().coll()), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 0 << "code" << ErrorCodes::NamespaceNotFound); }); @@ -227,6 +236,8 @@ TEST_F(DropColl2ShardTest, NSNotFound) { ASSERT_EQ(dropNS().db(), request.dbname); ASSERT_EQ(BSON("drop" << dropNS().coll()), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 0 << "code" << ErrorCodes::NamespaceNotFound); }); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp index f0cf6218fa8..cbb0ded71d6 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp @@ -60,6 +60,9 @@ public: void expectActionLogCreate(const BSONObj& response) { onCommand([&response](const RemoteCommandRequest& request) { ASSERT_EQUALS("config", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObj expectedCreateCmd = BSON("create" << ActionLogType::ConfigNS << "capped" << true << "size" << 1024 * 1024 * 2); ASSERT_EQUALS(expectedCreateCmd, request.cmdObj); @@ -72,6 +75,8 @@ public: onCommand([&expectedActionLog](const RemoteCommandRequest& request) { ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedInsertRequest actualBatchedInsert; std::string errmsg; ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp index 970fadff866..dc12eeb5a7d 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp @@ -146,6 +146,8 @@ TEST_F(RemoveShardTest, RemoveShardStartDraining) { ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -318,6 +320,8 @@ TEST_F(RemoveShardTest, RemoveShardCompletion) { ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedDeleteRequest actualBatchedDelete; std::string errmsg; ASSERT_TRUE(actualBatchedDelete.parseBSON(request.dbname, request.cmdObj, &errmsg)); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp index 6fc42a28dd8..3c0a59a68ec 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp @@ -105,6 +105,8 @@ public: ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -173,6 +175,8 @@ public: ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -729,6 +733,8 @@ TEST_F(ShardCollectionTest, withInitialData) { ASSERT_EQUALS(0, request.cmdObj["maxSplitPoints"].numberLong()); ASSERT_EQUALS(0, request.cmdObj["maxChunkObjects"].numberLong()); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "splitKeys" << BSON_ARRAY(splitPoint0 << splitPoint1 << splitPoint2 << splitPoint3)); }); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp index e08996461f5..8d295a29e6c 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp @@ -38,6 +38,7 @@ #include "mongo/db/query/lite_parsed_query.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h" @@ -64,6 +65,7 @@ using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using executor::TaskExecutor; +using rpc::ReplSetMetadata; using std::string; using std::vector; using stdx::chrono::milliseconds; @@ -482,8 +484,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommand) { auto future = launchAsync([this] { BSONObjBuilder responseBuilder; - bool ok = - catalogManager()->runReadCommand("test", BSON("usersInfo" << 1), &responseBuilder); + bool ok = catalogManager()->runUserManagementReadCommand( + "test", BSON("usersInfo" << 1), &responseBuilder); ASSERT_TRUE(ok); BSONObj response = responseBuilder.obj(); @@ -493,6 +495,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommand) { }); onCommand([](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + ASSERT_EQUALS("test", request.dbname); ASSERT_EQUALS(BSON("usersInfo" << 1), request.cmdObj); @@ -508,7 +512,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommandUnsatisfiedReadPre Status(ErrorCodes::FailedToSatisfyReadPreference, "no nodes up")); BSONObjBuilder responseBuilder; - bool ok = catalogManager()->runReadCommand("test", BSON("usersInfo" << 1), &responseBuilder); + bool ok = catalogManager()->runUserManagementReadCommand( + "test", BSON("usersInfo" << 1), &responseBuilder); ASSERT_FALSE(ok); Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj()); @@ -571,6 +576,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandSuccess) { << "test"), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObjBuilder responseBuilder; Command::appendCommandStatus(responseBuilder, Status(ErrorCodes::UserNotFound, "User test@test not found")); @@ -669,6 +676,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandNotMasterRetrySuc << "test"), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return BSON("ok" << 1); }); @@ -1212,6 +1221,8 @@ TEST_F(CatalogManagerReplSetTest, UpdateDatabase) { onCommand([dbt](const RemoteCommandRequest& request) { ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -1284,6 +1295,9 @@ TEST_F(CatalogManagerReplSetTest, ApplyChunkOpsDeprecated) { onCommand([updateOps, preCondition](const RemoteCommandRequest& request) { ASSERT_EQUALS("config", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + ASSERT_EQUALS(updateOps, request.cmdObj["applyOps"].Obj()); ASSERT_EQUALS(preCondition, request.cmdObj["preCondition"].Obj()); @@ -1316,6 +1330,8 @@ TEST_F(CatalogManagerReplSetTest, ApplyChunkOpsDeprecatedCommandFailed) { ASSERT_EQUALS(updateOps, request.cmdObj["applyOps"].Obj()); ASSERT_EQUALS(preCondition, request.cmdObj["preCondition"].Obj()); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObjBuilder responseBuilder; Command::appendCommandStatus(responseBuilder, Status(ErrorCodes::BadValue, "precondition failed")); @@ -1399,6 +1415,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 10); }); @@ -1409,6 +1427,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 1); }); @@ -1419,6 +1439,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 100); }); @@ -1427,6 +1449,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedInsertRequest actualBatchedInsert; std::string errmsg; ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -1657,6 +1681,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 10); }); @@ -1667,6 +1693,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 1); }); @@ -1677,6 +1705,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return BSON("ok" << 1 << "totalSize" << 100); }); @@ -1685,6 +1715,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedInsertRequest actualBatchedInsert; std::string errmsg; ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -1765,6 +1797,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) { ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); + return fromjson(R"({ databases: [], totalSize: 1, @@ -1776,6 +1810,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) { ASSERT_EQ(HostAndPort("config:123"), request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObj expectedCmd(fromjson(R"({ update: "databases", updates: [{ @@ -1871,6 +1907,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingDBExists) { ASSERT_EQ(HostAndPort("config:123"), request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BSONObj expectedCmd(fromjson(R"({ update: "databases", updates: [{ @@ -1948,5 +1986,128 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExistsNoShards) { future.timed_get(kFutureTimeout); } +TEST_F(CatalogManagerReplSetTest, BasicReadAfterOpTime) { + configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1")); + + repl::OpTime lastOpTime; + for (int x = 0; x < 3; x++) { + auto future = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE( + catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + const repl::OpTime newOpTime(Timestamp(x + 2, x + 6), x + 5); + + onCommandWithMetadata([this, &newOpTime, &lastOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, lastOpTime.getTimestamp(), lastOpTime.getTerm()); + + ReplSetMetadata metadata(12, newOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1)); + }); + + // Now wait for the runReadCommand call to return + future.timed_get(kFutureTimeout); + + lastOpTime = newOpTime; + } +} + +TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { + configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1")); + + // Initialize the internal config OpTime + auto future1 = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + repl::OpTime highestOpTime; + const repl::OpTime newOpTime(Timestamp(7, 6), 5); + + onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + ReplSetMetadata metadata(12, newOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1)); + }); + + future1.timed_get(kFutureTimeout); + + highestOpTime = newOpTime; + + // Return an older OpTime + auto future2 = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + const repl::OpTime oldOpTime(Timestamp(3, 10), 5); + + onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + ReplSetMetadata metadata(12, oldOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1)); + }); + + future2.timed_get(kFutureTimeout); + + // Check that older OpTime does not override highest OpTime + auto future3 = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + ReplSetMetadata metadata(12, oldOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1)); + }); + + future3.timed_get(kFutureTimeout); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp index 767555bcc13..3b0036418a5 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp @@ -40,6 +40,7 @@ #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/lite_parsed_query.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" @@ -179,6 +180,11 @@ void CatalogManagerReplSetTestFixture::onCommand(NetworkTestEnv::OnCommandFuncti _networkTestEnv->onCommand(func); } +void CatalogManagerReplSetTestFixture::onCommandWithMetadata( + NetworkTestEnv::OnCommandWithMetadataFunction func) { + _networkTestEnv->onCommandWithMetadata(func); +} + void CatalogManagerReplSetTestFixture::onFindCommand(NetworkTestEnv::OnFindCommandFunction func) { _networkTestEnv->onFindCommand(func); } @@ -312,6 +318,7 @@ void CatalogManagerReplSetTestFixture::expectUpdateCollection(const HostAndPort& const CollectionType& coll) { onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(expectedHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); ASSERT_EQUALS("config", request.dbname); BatchedUpdateRequest actualBatchedUpdate; @@ -342,6 +349,7 @@ void CatalogManagerReplSetTestFixture::expectSetShardVersion( const ChunkVersion& expectedChunkVersion) { onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(expectedHost, request.target); + ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); SetShardVersionRequest ssv = assertGet(SetShardVersionRequest::parseFromBSON(request.cmdObj)); @@ -379,10 +387,32 @@ void CatalogManagerReplSetTestFixture::expectCount(const HostAndPort& configHost return BSON("ok" << 1 << "n" << response.getValue()); } + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + BSONObjBuilder responseBuilder; Command::appendCommandStatus(responseBuilder, response.getStatus()); return responseBuilder.obj(); }); } +void CatalogManagerReplSetTestFixture::checkReadConcern(const BSONObj& cmdObj, + const Timestamp& expectedTS, + long long expectedTerm) const { + auto readConcernElem = cmdObj[repl::ReadConcernArgs::kReadConcernFieldName]; + ASSERT_EQ(Object, readConcernElem.type()); + + auto readConcernObj = readConcernElem.Obj(); + ASSERT_EQ("majority", readConcernObj[repl::ReadConcernArgs::kLevelFieldName].str()); + + auto afterElem = readConcernObj[repl::ReadConcernArgs::kOpTimeFieldName]; + ASSERT_EQ(Object, afterElem.type()); + + auto afterObj = afterElem.Obj(); + + ASSERT_TRUE(afterObj.hasField(repl::ReadConcernArgs::kOpTimestampFieldName)); + ASSERT_EQ(expectedTS, afterObj[repl::ReadConcernArgs::kOpTimestampFieldName].timestamp()); + ASSERT_TRUE(afterObj.hasField(repl::ReadConcernArgs::kOpTermFieldName)); + ASSERT_EQ(expectedTerm, afterObj[repl::ReadConcernArgs::kOpTermFieldName].numberLong()); +} + } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h index 5a1fce08408..67d8d678637 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h @@ -94,6 +94,7 @@ protected: * single request + response or find tests. */ void onCommand(executor::NetworkTestEnv::OnCommandFunction func); + void onCommandWithMetadata(executor::NetworkTestEnv::OnCommandWithMetadataFunction func); void onFindCommand(executor::NetworkTestEnv::OnFindCommandFunction func); /** @@ -157,6 +158,13 @@ protected: void shutdownExecutor(); + /** + * Checks that the given command has the expected settings for read after opTime. + */ + void checkReadConcern(const BSONObj& cmdObj, + const Timestamp& expectedTS, + long long expectedTerm) const; + private: std::unique_ptr<ServiceContext> _service; ServiceContext::UniqueClient _client; diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp index 51afc7b40b3..c20e98a4ef9 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp @@ -151,6 +151,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfig) { ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return fromjson(R"({ databases: [ { name: "local" } @@ -164,6 +166,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfig) { ASSERT_EQ(HostAndPort("config:123"), request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -210,6 +214,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfigWithAdmin ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return fromjson(R"({ databases: [ { name: "local" }, @@ -224,6 +230,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfigWithAdmin ASSERT_EQ(HostAndPort("config:123"), request.target); ASSERT_EQ("config", request.dbname); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + BatchedUpdateRequest actualBatchedUpdate; std::string errmsg; ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg)); @@ -255,6 +263,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeWriteError) { ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return fromjson(R"({ databases: [ { name: "local" } @@ -296,6 +306,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocNonEmptyConfigServer ASSERT_EQ("admin", request.dbname); ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + return fromjson(R"({ databases: [ { name: "local" }, diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index d99b3c99995..d51711e66f3 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -15,6 +15,7 @@ env.Library( '$BUILD_DIR/mongo/client/fetcher', '$BUILD_DIR/mongo/client/remote_command_runner_impl', '$BUILD_DIR/mongo/client/remote_command_targeter', + '$BUILD_DIR/mongo/rpc/metadata', ] ) diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index f44b800a989..90866b677b9 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -39,6 +39,7 @@ #include "mongo/client/replica_set_monitor.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard.h" @@ -322,10 +323,24 @@ StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host, const std::string& dbName, const BSONObj& cmdObj) { + auto status = runCommandWithMetadata(host, dbName, cmdObj, rpc::makeEmptyMetadata()); + + if (!status.isOK()) { + return status.getStatus(); + } + + return status.getValue().response; +} + +StatusWith<ShardRegistry::CommandResponse> ShardRegistry::runCommandWithMetadata( + const HostAndPort& host, + const std::string& dbName, + const BSONObj& cmdObj, + const BSONObj& metadata) { StatusWith<executor::RemoteCommandResponse> responseStatus = Status(ErrorCodes::InternalError, "Internal error running command"); - executor::RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout); + executor::RemoteCommandRequest request(host, dbName, cmdObj, metadata, kConfigCommandTimeout); auto callStatus = _executor->scheduleRemoteCommand(request, [&responseStatus](const RemoteCommandCallbackArgs& args) { @@ -342,12 +357,42 @@ StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host, return responseStatus.getStatus(); } - return responseStatus.getValue().data; + auto response = responseStatus.getValue(); + + CommandResponse cmdResponse; + cmdResponse.response = response.data; + + if (auto replField = response.metadata[rpc::kReplicationMetadataFieldName]) { + auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(replField.Obj()); + + if (!replParseStatus.isOK()) { + return replParseStatus.getStatus(); + } + + // TODO: SERVER-19734 use config server snapshot time. + cmdResponse.opTime = replParseStatus.getValue().getLastCommittedOptime(); + } + + return cmdResponse; } StatusWith<BSONObj> ShardRegistry::runCommandWithNotMasterRetries(const ShardId& shardId, const std::string& dbname, const BSONObj& cmdObj) { + auto status = runCommandWithNotMasterRetries(shardId, dbname, cmdObj, rpc::makeEmptyMetadata()); + + if (!status.isOK()) { + return status.getStatus(); + } + + return status.getValue().response; +} + +StatusWith<ShardRegistry::CommandResponse> ShardRegistry::runCommandWithNotMasterRetries( + const ShardId& shardId, + const std::string& dbname, + const BSONObj& cmdObj, + const BSONObj& metadata) { auto targeter = getShard(shardId)->getTargeter(); const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet{}); @@ -365,12 +410,12 @@ StatusWith<BSONObj> ShardRegistry::runCommandWithNotMasterRetries(const ShardId& return target.getStatus(); } - auto response = runCommand(target.getValue(), dbname, cmdObj); + auto response = runCommandWithMetadata(target.getValue(), dbname, cmdObj, metadata); if (!response.isOK()) { return response.getStatus(); } - Status commandStatus = getStatusFromCommandResult(response.getValue()); + Status commandStatus = getStatusFromCommandResult(response.getValue().response); if (ErrorCodes::NotMaster == commandStatus || ErrorCodes::NotMasterNoSlaveOkCode == commandStatus) { targeter->markHostNotMaster(target.getValue()); diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 6f5fc76ea4c..634bcdf2340 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -34,6 +34,8 @@ #include <vector> #include "mongo/base/disallow_copying.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/optime.h" #include "mongo/s/client/shard.h" #include "mongo/stdx/mutex.h" @@ -65,6 +67,11 @@ class ShardRegistry { MONGO_DISALLOW_COPYING(ShardRegistry); public: + struct CommandResponse { + BSONObj response; + repl::OpTime opTime; + }; + /** * Instantiates a new shard registry. * @@ -153,6 +160,14 @@ public: * Runs a command against the specified host and returns the result. It is the responsibility * of the caller to check the returned BSON for command-specific failures. */ + StatusWith<CommandResponse> runCommandWithMetadata(const HostAndPort& host, + const std::string& dbName, + const BSONObj& cmdObj, + const BSONObj& metadata); + + /** + * Runs a command against the specified host and returns the result. + */ StatusWith<BSONObj> runCommand(const HostAndPort& host, const std::string& dbName, const BSONObj& cmdObj); @@ -172,6 +187,11 @@ public: const std::string& dbname, const BSONObj& cmdObj); + StatusWith<CommandResponse> runCommandWithNotMasterRetries(const ShardId& shardId, + const std::string& dbname, + const BSONObj& cmdObj, + const BSONObj& metadata); + private: typedef std::map<ShardId, std::shared_ptr<Shard>> ShardMap; diff --git a/src/mongo/s/commands/cluster_user_management_commands.cpp b/src/mongo/s/commands/cluster_user_management_commands.cpp index 0bb67e31c2a..d2a6ab667d4 100644 --- a/src/mongo/s/commands/cluster_user_management_commands.cpp +++ b/src/mongo/s/commands/cluster_user_management_commands.cpp @@ -352,7 +352,7 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - return grid.catalogManager()->runReadCommand(dbname, cmdObj, &result); + return grid.catalogManager()->runUserManagementReadCommand(dbname, cmdObj, &result); } } cmdUsersInfo; @@ -710,7 +710,7 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - return grid.catalogManager()->runReadCommand(dbname, cmdObj, &result); + return grid.catalogManager()->runUserManagementReadCommand(dbname, cmdObj, &result); } } cmdRolesInfo; diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js index baf0c58a270..90ad37bb223 100755 --- a/src/mongo/shell/servers.js +++ b/src/mongo/shell/servers.js @@ -828,6 +828,9 @@ function appendSetParameterArgs(argArray) { }); } } + if (argArray.indexOf('--configsvr') > 0 && argArray.indexOf('--replSet') > 0) { + argArray.push.apply(argArray, ['--setParameter', "enableReplSnapshotThread=1"]); + } } } return argArray; |