summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2015-07-29 17:24:35 -0400
committerRandolph Tan <randolph@10gen.com>2015-08-06 09:58:01 -0400
commit1cfa49411655640a5b1d2da60573c3f01d3c7c38 (patch)
treead5b8f6c570905441a51fb283910125f6ebfae65
parent1eee2b3a8c079c15ddc79e03e1b1d16b37d427d2 (diff)
downloadmongo-1cfa49411655640a5b1d2da60573c3f01d3c7c38.tar.gz
SERVER-19390 Make config server read commands do read committed
-rw-r--r--buildscripts/resmokelib/core/programs.py5
-rw-r--r--buildscripts/resmokelib/testing/fixtures/shardedcluster.py8
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_s.cpp15
-rw-r--r--src/mongo/db/auth/user_cache_invalidator_job.cpp2
-rw-r--r--src/mongo/executor/network_test_env.cpp11
-rw-r--r--src/mongo/executor/network_test_env.h4
-rw-r--r--src/mongo/s/catalog/catalog_manager.h9
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.cpp8
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.h10
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp8
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h4
-rw-r--r--src/mongo/s/catalog/replset/SConscript2
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp163
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h66
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp12
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp11
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp5
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp4
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp6
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp167
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp30
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h8
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp12
-rw-r--r--src/mongo/s/client/SConscript1
-rw-r--r--src/mongo/s/client/shard_registry.cpp53
-rw-r--r--src/mongo/s/client/shard_registry.h20
-rw-r--r--src/mongo/s/commands/cluster_user_management_commands.cpp4
-rwxr-xr-xsrc/mongo/shell/servers.js3
28 files changed, 572 insertions, 79 deletions
diff --git a/buildscripts/resmokelib/core/programs.py b/buildscripts/resmokelib/core/programs.py
index cce38bcd054..929157dbf9e 100644
--- a/buildscripts/resmokelib/core/programs.py
+++ b/buildscripts/resmokelib/core/programs.py
@@ -58,6 +58,11 @@ def mongod_program(logger, executable=None, process_kwargs=None, **kwargs):
else:
kwargs[opt_name] = shortcut_opts[opt_name]
+ # Override the storage engine specified on the command line with "wiredTiger" if running a
+ # config server replica set.
+ if "replSet" in kwargs and "configsvr" in kwargs:
+ kwargs["storageEngine"] = "wiredTiger"
+
# Apply the rest of the command line arguments.
_apply_kwargs(args, kwargs)
diff --git a/buildscripts/resmokelib/testing/fixtures/shardedcluster.py b/buildscripts/resmokelib/testing/fixtures/shardedcluster.py
index 5224b67cb6f..c70eb81b596 100644
--- a/buildscripts/resmokelib/testing/fixtures/shardedcluster.py
+++ b/buildscripts/resmokelib/testing/fixtures/shardedcluster.py
@@ -4,6 +4,7 @@ Sharded cluster fixture for executing JSTests against.
from __future__ import absolute_import
+import copy
import os.path
import time
@@ -171,10 +172,11 @@ class ShardedClusterFixture(interface.Fixture):
logger_name = "%s:configsvr" % (self.logger.name)
mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger)
- mongod_options = self.mongod_options.copy()
+ mongod_options = copy.deepcopy(self.mongod_options)
mongod_options["configsvr"] = ""
mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "config")
mongod_options["replSet"] = ShardedClusterFixture._CONFIGSVR_REPLSET_NAME
+ mongod_options["set_parameters"]["enableReplSnapshotThread"] = 1
return replicaset.ReplicaSetFixture(mongod_logger,
self.job_num,
@@ -193,7 +195,7 @@ class ShardedClusterFixture(interface.Fixture):
logger_name = "%s:shard%d" % (self.logger.name, index)
mongod_logger = logging.loggers.new_logger(logger_name, parent=self.logger)
- mongod_options = self.mongod_options.copy()
+ mongod_options = copy.deepcopy(self.mongod_options)
mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "shard%d" % (index))
return standalone.MongoDFixture(mongod_logger,
@@ -211,7 +213,7 @@ class ShardedClusterFixture(interface.Fixture):
logger_name = "%s:mongos" % (self.logger.name)
mongos_logger = logging.loggers.new_logger(logger_name, parent=self.logger)
- mongos_options = self.mongos_options.copy()
+ mongos_options = copy.deepcopy(self.mongos_options)
if self.separate_configsvr:
configdb_replset = ShardedClusterFixture._CONFIGSVR_REPLSET_NAME
configdb_port = self.configsvr.port
diff --git a/src/mongo/db/auth/authz_manager_external_state_s.cpp b/src/mongo/db/auth/authz_manager_external_state_s.cpp
index bb62e56168c..b71cf0cf08c 100644
--- a/src/mongo/db/auth/authz_manager_external_state_s.cpp
+++ b/src/mongo/db/auth/authz_manager_external_state_s.cpp
@@ -69,7 +69,8 @@ Status AuthzManagerExternalStateMongos::getStoredAuthorizationVersion(OperationC
// that runs this command
BSONObj getParameterCmd = BSON("getParameter" << 1 << authSchemaVersionServerParameter << 1);
BSONObjBuilder builder;
- const bool ok = grid.catalogManager()->runReadCommand("admin", getParameterCmd, &builder);
+ const bool ok =
+ grid.catalogManager()->runUserManagementReadCommand("admin", getParameterCmd, &builder);
BSONObj cmdResult = builder.obj();
if (!ok) {
return Command::getStatusFromCommandResult(cmdResult);
@@ -94,7 +95,8 @@ Status AuthzManagerExternalStateMongos::getUserDescription(OperationContext* txn
<< userName.getDB())) << "showPrivileges" << true
<< "showCredentials" << true);
BSONObjBuilder builder;
- const bool ok = grid.catalogManager()->runReadCommand("admin", usersInfoCmd, &builder);
+ const bool ok =
+ grid.catalogManager()->runUserManagementReadCommand("admin", usersInfoCmd, &builder);
BSONObj cmdResult = builder.obj();
if (!ok) {
return Command::getStatusFromCommandResult(cmdResult);
@@ -123,7 +125,8 @@ Status AuthzManagerExternalStateMongos::getRoleDescription(const RoleName& roleN
<< roleName.getRole() << AuthorizationManager::ROLE_DB_FIELD_NAME
<< roleName.getDB())) << "showPrivileges" << showPrivileges);
BSONObjBuilder builder;
- const bool ok = grid.catalogManager()->runReadCommand("admin", rolesInfoCmd, &builder);
+ const bool ok =
+ grid.catalogManager()->runUserManagementReadCommand("admin", rolesInfoCmd, &builder);
BSONObj cmdResult = builder.obj();
if (!ok) {
return Command::getStatusFromCommandResult(cmdResult);
@@ -150,7 +153,8 @@ Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(const std::stri
BSONObj rolesInfoCmd = BSON("rolesInfo" << 1 << "showPrivileges" << showPrivileges
<< "showBuiltinRoles" << showBuiltinRoles);
BSONObjBuilder builder;
- const bool ok = grid.catalogManager()->runReadCommand(dbname, rolesInfoCmd, &builder);
+ const bool ok =
+ grid.catalogManager()->runUserManagementReadCommand(dbname, rolesInfoCmd, &builder);
BSONObj cmdResult = builder.obj();
if (!ok) {
return Command::getStatusFromCommandResult(cmdResult);
@@ -164,7 +168,8 @@ Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(const std::stri
bool AuthzManagerExternalStateMongos::hasAnyPrivilegeDocuments(OperationContext* txn) {
BSONObj usersInfoCmd = BSON("usersInfo" << 1);
BSONObjBuilder builder;
- const bool ok = grid.catalogManager()->runReadCommand("admin", usersInfoCmd, &builder);
+ const bool ok =
+ grid.catalogManager()->runUserManagementReadCommand("admin", usersInfoCmd, &builder);
if (!ok) {
// If we were unable to complete the query,
// it's best to assume that there _are_ privilege documents. This might happen
diff --git a/src/mongo/db/auth/user_cache_invalidator_job.cpp b/src/mongo/db/auth/user_cache_invalidator_job.cpp
index 2e44a9ff65a..dd44b200f60 100644
--- a/src/mongo/db/auth/user_cache_invalidator_job.cpp
+++ b/src/mongo/db/auth/user_cache_invalidator_job.cpp
@@ -91,7 +91,7 @@ public:
StatusWith<OID> getCurrentCacheGeneration() {
try {
BSONObjBuilder result;
- const bool ok = grid.catalogManager()->runReadCommand(
+ const bool ok = grid.catalogManager()->runUserManagementReadCommand(
"admin", BSON("_getUserCacheGeneration" << 1), &result);
if (!ok) {
return Command::getStatusFromCommandResult(result.obj());
diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp
index c65d1adcf2a..64241e72c12 100644
--- a/src/mongo/executor/network_test_env.cpp
+++ b/src/mongo/executor/network_test_env.cpp
@@ -64,6 +64,17 @@ void NetworkTestEnv::onCommand(OnCommandFunction func) {
_mockNetwork->exitNetwork();
}
+void NetworkTestEnv::onCommandWithMetadata(OnCommandWithMetadataFunction func) {
+ _mockNetwork->enterNetwork();
+
+ const NetworkInterfaceMock::NetworkOperationIterator noi = _mockNetwork->getNextReadyRequest();
+ const RemoteCommandRequest& request = noi->getRequest();
+ _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), func(request));
+ _mockNetwork->runReadyNetworkOperations();
+
+ _mockNetwork->exitNetwork();
+}
+
void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) {
onCommand([&func](const RemoteCommandRequest& request) -> StatusWith<BSONObj> {
const auto& resultStatus = func(request);
diff --git a/src/mongo/executor/network_test_env.h b/src/mongo/executor/network_test_env.h
index b68b15a5a7f..6ef0f242597 100644
--- a/src/mongo/executor/network_test_env.h
+++ b/src/mongo/executor/network_test_env.h
@@ -129,8 +129,9 @@ public:
std::move(future), _executor, _mockNetwork};
}
-
using OnCommandFunction = stdx::function<StatusWith<BSONObj>(const RemoteCommandRequest&)>;
+ using OnCommandWithMetadataFunction =
+ stdx::function<StatusWith<RemoteCommandResponse>(const RemoteCommandRequest&)>;
using OnFindCommandFunction =
stdx::function<StatusWith<std::vector<BSONObj>>(const RemoteCommandRequest&)>;
@@ -146,6 +147,7 @@ public:
* single request + response or find tests.
*/
void onCommand(OnCommandFunction func);
+ void onCommandWithMetadata(OnCommandWithMetadataFunction func);
void onFindCommand(OnFindCommandFunction func);
private:
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h
index 2f3ee76fcbf..366e6c63644 100644
--- a/src/mongo/s/catalog/catalog_manager.h
+++ b/src/mongo/s/catalog/catalog_manager.h
@@ -274,13 +274,20 @@ public:
BSONObjBuilder* result) = 0;
/**
- * Runs a read-only command on a single config server.
+ * Runs a read-only command on a config server.
*/
virtual bool runReadCommand(const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder* result) = 0;
/**
+ * Runs a user management related read-only command on a config server.
+ */
+ virtual bool runUserManagementReadCommand(const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) = 0;
+
+ /**
* Applies oplog entries to the config servers.
* Used by mergeChunk, splitChunk, and moveChunk commands.
*
diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp
index 7e4275409f1..ab2cf24bfc9 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/catalog_manager_mock.cpp
@@ -133,12 +133,18 @@ bool CatalogManagerMock::runUserManagementWriteCommand(const string& commandName
return true;
}
-bool CatalogManagerMock::runReadCommand(const string& dbname,
+bool CatalogManagerMock::runReadCommand(const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder* result) {
return true;
}
+bool CatalogManagerMock::runUserManagementReadCommand(const string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) {
+ return true;
+}
+
Status CatalogManagerMock::applyChunkOpsDeprecated(const BSONArray& updateOps,
const BSONArray& preCondition) {
return Status::OK();
diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h
index 90e25993fcb..83bcab351e2 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.h
+++ b/src/mongo/s/catalog/catalog_manager_mock.h
@@ -94,9 +94,13 @@ public:
const BSONObj& cmdObj,
BSONObjBuilder* result) override;
- bool runReadCommand(const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) override;
+ virtual bool runReadCommand(const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) override;
+
+ bool runUserManagementReadCommand(const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) override;
Status applyChunkOpsDeprecated(const BSONArray& updateOps,
const BSONArray& preCondition) override;
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index 4996af0c543..24b710ec14b 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -879,7 +879,13 @@ bool CatalogManagerLegacy::runUserManagementWriteCommand(const string& commandNa
return Command::appendCommandStatus(*result, status);
}
-bool CatalogManagerLegacy::runReadCommand(const string& dbname,
+bool CatalogManagerLegacy::runUserManagementReadCommand(const string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) {
+ return runReadCommand(dbname, cmdObj, result);
+}
+
+bool CatalogManagerLegacy::runReadCommand(const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder* result) {
try {
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
index 7e202d8aa2c..068b8555d91 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
@@ -101,6 +101,10 @@ public:
const BSONObj& cmdObj,
BSONObjBuilder* result) override;
+ bool runUserManagementReadCommand(const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) override;
+
Status applyChunkOpsDeprecated(const BSONArray& updateOps,
const BSONArray& preCondition) override;
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript
index 2e7d7d72f4a..2cae356706a 100644
--- a/src/mongo/s/catalog/replset/SConscript
+++ b/src/mongo/s/catalog/replset/SConscript
@@ -35,6 +35,7 @@ env.Library(
'catalog_manager_replica_set.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/repl/read_concern_args',
'$BUILD_DIR/mongo/s/catalog/catalog_manager',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager',
'$BUILD_DIR/mongo/s/client/sharding_client',
@@ -58,6 +59,7 @@ env.CppUnitTest(
'catalog_manager_replica_set',
'$BUILD_DIR/mongo/client/remote_command_targeter_mock',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/rpc/metadata',
'$BUILD_DIR/mongo/executor/network_test_env',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 6296253463a..1590821c415 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -45,8 +45,10 @@
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/network_interface.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata.h"
#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/catalog/type_actionlog.h"
@@ -86,8 +88,11 @@ namespace {
// Until read committed is supported always write to the primary with majority write and read
// from the secondary. That way we ensure that reads will see a consistent data.
-const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryPreferred, TagSet{});
-
+// TODO: switch back to SecondaryPreferred once SERVER-19675 is fixed
+const ReadPreferenceSetting kConfigReadSelector(ReadPreference::PrimaryOnly, TagSet{});
+const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred,
+ TagSet{});
+const BSONObj kReplMetadata(BSON(rpc::kReplicationMetadataFieldName << 1));
const int kInitialSSVRetries = 3;
const int kActionLogCollectionSize = 1024 * 1024 * 2;
const int kChangeLogCollectionSize = 1024 * 1024 * 10;
@@ -169,7 +174,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn,
return readHost.getStatus();
}
- auto countStatus = _runCountCommand(
+ auto countStatus = _runCountCommandOnConfig(
readHost.getValue(), NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::ns(ns)));
if (!countStatus.isOK()) {
return countStatus.getStatus();
@@ -247,10 +252,10 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC
}
// Check preconditions for removing the shard
- auto countStatus =
- _runCountCommand(readHost.getValue(),
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << name << ShardType::draining(true)));
+ auto countStatus = _runCountCommandOnConfig(
+ readHost.getValue(),
+ NamespaceString(ShardType::ConfigNS),
+ BSON(ShardType::name() << NE << name << ShardType::draining(true)));
if (!countStatus.isOK()) {
return countStatus.getStatus();
}
@@ -259,9 +264,9 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC
"Can't have more than one draining shard at a time");
}
- countStatus = _runCountCommand(readHost.getValue(),
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << name));
+ countStatus = _runCountCommandOnConfig(readHost.getValue(),
+ NamespaceString(ShardType::ConfigNS),
+ BSON(ShardType::name() << NE << name));
if (!countStatus.isOK()) {
return countStatus.getStatus();
}
@@ -270,9 +275,10 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC
}
// Figure out if shard is already draining
- countStatus = _runCountCommand(readHost.getValue(),
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << name << ShardType::draining(true)));
+ countStatus =
+ _runCountCommandOnConfig(readHost.getValue(),
+ NamespaceString(ShardType::ConfigNS),
+ BSON(ShardType::name() << name << ShardType::draining(true)));
if (!countStatus.isOK()) {
return countStatus.getStatus();
}
@@ -300,16 +306,16 @@ StatusWith<ShardDrainingStatus> CatalogManagerReplicaSet::removeShard(OperationC
// Draining has already started, now figure out how many chunks and databases are still on the
// shard.
- countStatus = _runCountCommand(
+ countStatus = _runCountCommandOnConfig(
readHost.getValue(), NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(name)));
if (!countStatus.isOK()) {
return countStatus.getStatus();
}
const long long chunkCount = countStatus.getValue();
- countStatus = _runCountCommand(readHost.getValue(),
- NamespaceString(DatabaseType::ConfigNS),
- BSON(DatabaseType::primary(name)));
+ countStatus = _runCountCommandOnConfig(readHost.getValue(),
+ NamespaceString(DatabaseType::ConfigNS),
+ BSON(DatabaseType::primary(name)));
if (!countStatus.isOK()) {
return countStatus.getStatus();
}
@@ -452,8 +458,7 @@ void CatalogManagerReplicaSet::logAction(const ActionLogType& actionLog) {
if (_actionLogCollectionCreated.load() == 0) {
BSONObj createCmd = BSON("create" << ActionLogType::ConfigNS << "capped" << true << "size"
<< kActionLogCollectionSize);
- auto result =
- grid.shardRegistry()->runCommandWithNotMasterRetries("config", "config", createCmd);
+ auto result = _runCommandOnConfigWithNotMasterRetries("config", createCmd);
if (!result.isOK()) {
LOG(1) << "couldn't create actionlog collection: " << causedBy(result.getStatus());
return;
@@ -481,8 +486,7 @@ void CatalogManagerReplicaSet::logChange(const string& clientAddress,
if (_changeLogCollectionCreated.load() == 0) {
BSONObj createCmd = BSON("create" << ChangeLogType::ConfigNS << "capped" << true << "size"
<< kChangeLogCollectionSize);
- auto result =
- grid.shardRegistry()->runCommandWithNotMasterRetries("config", "config", createCmd);
+ auto result = _runCommandOnConfigWithNotMasterRetries("config", createCmd);
if (!result.isOK()) {
LOG(1) << "couldn't create changelog collection: " << causedBy(result.getStatus());
return;
@@ -735,7 +739,7 @@ bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string&
return Command::appendCommandStatus(*result, scopedDistLock.getStatus());
}
- auto response = grid.shardRegistry()->runCommandWithNotMasterRetries("config", dbname, cmdObj);
+ auto response = _runCommandOnConfigWithNotMasterRetries(dbname, cmdObj);
if (!response.isOK()) {
return Command::appendCommandStatus(*result, response.getStatus());
}
@@ -746,26 +750,23 @@ bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string&
bool CatalogManagerReplicaSet::runReadCommand(const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder* result) {
- auto targeter = grid.shardRegistry()->getShard("config")->getTargeter();
- auto target = targeter->findHost(kConfigReadSelector);
- if (!target.isOK()) {
- return Command::appendCommandStatus(*result, target.getStatus());
- }
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.appendElements(cmdObj);
+ _appendReadConcern(&cmdBuilder);
- auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj);
- if (!resultStatus.isOK()) {
- return Command::appendCommandStatus(*result, resultStatus.getStatus());
- }
-
- result->appendElements(resultStatus.getValue());
+ return _runReadCommand(dbname, cmdBuilder.done(), kConfigReadSelector, result);
+}
- return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK();
+bool CatalogManagerReplicaSet::runUserManagementReadCommand(const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) {
+ return _runReadCommand(dbname, cmdObj, kConfigPrimaryPreferredSelector, result);
}
Status CatalogManagerReplicaSet::applyChunkOpsDeprecated(const BSONArray& updateOps,
const BSONArray& preCondition) {
BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition);
- auto response = grid.shardRegistry()->runCommandWithNotMasterRetries("config", "config", cmd);
+ auto response = _runCommandOnConfigWithNotMasterRetries("config", cmd);
if (!response.isOK()) {
return response.getStatus();
@@ -792,7 +793,7 @@ void CatalogManagerReplicaSet::writeConfigServerDirect(const BatchedCommandReque
invariant(dbname == "config" || dbname == "admin");
const BSONObj cmdObj = batchRequest.toBSON();
- auto response = grid.shardRegistry()->runCommandWithNotMasterRetries("config", dbname, cmdObj);
+ auto response = _runCommandOnConfigWithNotMasterRetries(dbname, cmdObj);
if (!response.isOK()) {
_toBatchError(response.getStatus(), batchResponse);
return;
@@ -896,11 +897,16 @@ StatusWith<std::string> CatalogManagerReplicaSet::_generateNewShardName() const
return Status(ErrorCodes::OperationFailed, "unable to generate new shard name");
}
-StatusWith<long long> CatalogManagerReplicaSet::_runCountCommand(const HostAndPort& target,
- const NamespaceString& ns,
- BSONObj query) {
- BSONObj countCmd = BSON("count" << ns.coll() << "query" << query);
- auto responseStatus = grid.shardRegistry()->runCommand(target, ns.db().toString(), countCmd);
+StatusWith<long long> CatalogManagerReplicaSet::_runCountCommandOnConfig(const HostAndPort& target,
+ const NamespaceString& ns,
+ BSONObj query) {
+ BSONObjBuilder countBuilder;
+ countBuilder.append("count", ns.coll());
+ countBuilder.append("query", query);
+ _appendReadConcern(&countBuilder);
+
+ auto responseStatus = _runCommandOnConfig(target, ns.db().toString(), countBuilder.done());
+
if (!responseStatus.isOK()) {
return responseStatus.getStatus();
}
@@ -995,8 +1001,7 @@ StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion() {
}
if (queryResults.empty()) {
- auto cmdStatus =
- grid.shardRegistry()->runCommand(readHost, "admin", BSON("listDatabases" << 1));
+ auto cmdStatus = _runCommandOnConfig(readHost, "admin", BSON("listDatabases" << 1));
if (!cmdStatus.isOK()) {
return cmdStatus.getStatus();
}
@@ -1036,4 +1041,76 @@ StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion() {
return versionTypeResult.getValue();
}
+StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfig(const HostAndPort& target,
+ const string& dbName,
+ BSONObj cmdObj) {
+ auto result =
+ grid.shardRegistry()->runCommandWithMetadata(target, dbName, cmdObj, kReplMetadata);
+
+ if (!result.isOK()) {
+ return result.getStatus();
+ }
+
+ const auto& response = result.getValue();
+
+ _updateLastSeenConfigOpTime(response.opTime);
+
+ return response.response;
+}
+
+StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfigWithNotMasterRetries(
+ const std::string& dbName, BSONObj cmdObj) {
+ auto result = grid.shardRegistry()->runCommandWithNotMasterRetries(
+ "config", dbName, cmdObj, kReplMetadata);
+
+ if (!result.isOK()) {
+ return result.getStatus();
+ }
+
+ const auto& response = result.getValue();
+
+ _updateLastSeenConfigOpTime(response.opTime);
+
+ return response.response;
+}
+
+repl::OpTime CatalogManagerReplicaSet::_getConfigOpTime() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _configOpTime;
+}
+
+void CatalogManagerReplicaSet::_updateLastSeenConfigOpTime(const repl::OpTime& optime) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (_configOpTime < optime) {
+ _configOpTime = optime;
+ }
+}
+
+void CatalogManagerReplicaSet::_appendReadConcern(BSONObjBuilder* builder) {
+ repl::ReadConcernArgs readConcern(_getConfigOpTime(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
+ readConcern.appendInfo(builder);
+}
+
+bool CatalogManagerReplicaSet::_runReadCommand(const std::string& dbname,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& settings,
+ BSONObjBuilder* result) {
+ auto targeter = grid.shardRegistry()->getShard("config")->getTargeter();
+ auto target = targeter->findHost(settings);
+ if (!target.isOK()) {
+ return Command::appendCommandStatus(*result, target.getStatus());
+ }
+
+ auto resultStatus = _runCommandOnConfig(target.getValue(), dbname, cmdObj);
+ if (!resultStatus.isOK()) {
+ return Command::appendCommandStatus(*result, resultStatus.getStatus());
+ }
+
+ result->appendElements(resultStatus.getValue());
+
+ return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index ef9abf9e734..18c2bec94b1 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/client/connection_string.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/stdx/mutex.h"
@@ -36,6 +37,7 @@
namespace mongo {
class NamespaceString;
+struct ReadPreferenceSetting;
class VersionType;
/**
@@ -102,6 +104,10 @@ public:
const BSONObj& cmdObj,
BSONObjBuilder* result) override;
+ bool runUserManagementReadCommand(const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) override;
+
Status applyChunkOpsDeprecated(const BSONArray& updateOps,
const BSONArray& preCondition) override;
@@ -126,36 +132,74 @@ private:
StatusWith<std::string> _generateNewShardName() const override;
+ bool _runReadCommand(const std::string& dbname,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& settings,
+ BSONObjBuilder* result);
+
/**
* Helper method for running a count command against a given target server with appropriate
* error handling.
*/
- StatusWith<long long> _runCountCommand(const HostAndPort& target,
- const NamespaceString& ns,
- BSONObj query);
+ StatusWith<long long> _runCountCommandOnConfig(const HostAndPort& target,
+ const NamespaceString& ns,
+ BSONObj query);
+
+ StatusWith<BSONObj> _runCommandOnConfig(const HostAndPort& target,
+ const std::string& dbName,
+ BSONObj cmdObj);
+
+ StatusWith<BSONObj> _runCommandOnConfigWithNotMasterRetries(const std::string& dbName,
+ BSONObj cmdObj);
+
+ /**
+ * Appends a read committed read concern to the request object.
+ */
+ void _appendReadConcern(BSONObjBuilder* builder);
/**
* Returns the current cluster schema/protocol version.
*/
StatusWith<VersionType> _getConfigVersion();
+ /**
+ * Returns the highest last known config server opTime.
+ */
+ repl::OpTime _getConfigOpTime();
+
+ /**
+ * Updates the last known config server opTime if the given opTime is newer.
+ */
+ void _updateLastSeenConfigOpTime(const repl::OpTime& optime);
+
+ //
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (F) Self synchronizing.
+ // (M) Must hold _mutex for access.
+ // (R) Read only, can only be written during initialization.
+ //
+
+ stdx::mutex _mutex;
+
// Config server connection string
- ConnectionString _configServerConnectionString;
+ ConnectionString _configServerConnectionString; // (R)
// Distribted lock manager singleton.
- std::unique_ptr<DistLockManager> _distLockManager;
+ std::unique_ptr<DistLockManager> _distLockManager; // (R)
// Whether the logAction call should attempt to create the actionlog collection
- AtomicInt32 _actionLogCollectionCreated;
+ AtomicInt32 _actionLogCollectionCreated; // (F)
// Whether the logChange call should attempt to create the changelog collection
- AtomicInt32 _changeLogCollectionCreated;
-
- // protects _inShutdown
- stdx::mutex _mutex;
+ AtomicInt32 _changeLogCollectionCreated; // (F)
// True if shutDown() has been called. False, otherwise.
- bool _inShutdown = false;
+ bool _inShutdown = false; // (M)
+
+ // Last known highest opTime from the config server.
+ repl::OpTime _configOpTime; // (M)
};
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
index 0dbbeaab975..14c5b87da11 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
@@ -78,6 +78,8 @@ protected:
ASSERT_EQ(request.dbname, "admin");
ASSERT_EQ(request.cmdObj, BSON("isdbgrid" << 1));
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(
responseBuilder, Status(ErrorCodes::CommandNotFound, "isdbgrid command not found"));
@@ -90,6 +92,8 @@ protected:
ASSERT_EQ(request.dbname, "admin");
ASSERT_EQ(request.cmdObj, BSON("isMaster" << 1));
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ismaster" << true);
});
@@ -98,6 +102,8 @@ protected:
ASSERT_EQ(request.dbname, "admin");
ASSERT_EQ(request.cmdObj, BSON("replSetGetStatus" << 1));
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(
responseBuilder,
@@ -116,6 +122,8 @@ protected:
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedUpdateRequest actualBatchedUpdate;
std::string errmsg;
ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
@@ -185,6 +193,8 @@ TEST_F(AddShardTest, AddShardStandalone) {
ASSERT_EQ(request.dbname, "admin");
ASSERT_EQ(request.cmdObj, BSON("listDatabases" << 1));
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
BSONArrayBuilder arr;
arr.append(BSON("name"
@@ -290,6 +300,8 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) {
ASSERT_EQ(request.dbname, "admin");
ASSERT_EQ(request.cmdObj, BSON("listDatabases" << 1));
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
BSONArrayBuilder arr;
arr.append(BSON("name"
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp
index 0a053edde55..d168b62edd0 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp
@@ -92,15 +92,20 @@ public:
ASSERT_EQ(_dropNS.db(), request.dbname);
ASSERT_EQ(BSON("drop" << _dropNS.coll()), request.cmdObj);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ns" << _dropNS.ns() << "ok" << 1);
});
}
void expectRemoveChunksAndMarkCollectionDropped() {
onCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
ASSERT_EQ(_configHost, request.target);
ASSERT_EQ("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BSONObj expectedCmd(fromjson(R"({
delete: "chunks",
deletes: [{ q: { ns: "test.user" }, limit: 0 }],
@@ -132,6 +137,8 @@ public:
ASSERT_EQ("admin", request.dbname);
ASSERT_EQ(BSON("unsetSharding" << 1), request.cmdObj);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("n" << 1 << "ok" << 1);
});
}
@@ -219,6 +226,8 @@ TEST_F(DropColl2ShardTest, NSNotFound) {
ASSERT_EQ(dropNS().db(), request.dbname);
ASSERT_EQ(BSON("drop" << dropNS().coll()), request.cmdObj);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ok" << 0 << "code" << ErrorCodes::NamespaceNotFound);
});
@@ -227,6 +236,8 @@ TEST_F(DropColl2ShardTest, NSNotFound) {
ASSERT_EQ(dropNS().db(), request.dbname);
ASSERT_EQ(BSON("drop" << dropNS().coll()), request.cmdObj);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ok" << 0 << "code" << ErrorCodes::NamespaceNotFound);
});
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp
index f0cf6218fa8..cbb0ded71d6 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_log_action_test.cpp
@@ -60,6 +60,9 @@ public:
void expectActionLogCreate(const BSONObj& response) {
onCommand([&response](const RemoteCommandRequest& request) {
ASSERT_EQUALS("config", request.dbname);
+
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BSONObj expectedCreateCmd = BSON("create" << ActionLogType::ConfigNS << "capped" << true
<< "size" << 1024 * 1024 * 2);
ASSERT_EQUALS(expectedCreateCmd, request.cmdObj);
@@ -72,6 +75,8 @@ public:
onCommand([&expectedActionLog](const RemoteCommandRequest& request) {
ASSERT_EQUALS("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedInsertRequest actualBatchedInsert;
std::string errmsg;
ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp
index 970fadff866..dc12eeb5a7d 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp
@@ -146,6 +146,8 @@ TEST_F(RemoveShardTest, RemoveShardStartDraining) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedUpdateRequest actualBatchedUpdate;
std::string errmsg;
ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
@@ -318,6 +320,8 @@ TEST_F(RemoveShardTest, RemoveShardCompletion) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedDeleteRequest actualBatchedDelete;
std::string errmsg;
ASSERT_TRUE(actualBatchedDelete.parseBSON(request.dbname, request.cmdObj, &errmsg));
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp
index 6fc42a28dd8..3c0a59a68ec 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp
@@ -105,6 +105,8 @@ public:
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedUpdateRequest actualBatchedUpdate;
std::string errmsg;
ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
@@ -173,6 +175,8 @@ public:
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedUpdateRequest actualBatchedUpdate;
std::string errmsg;
ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
@@ -729,6 +733,8 @@ TEST_F(ShardCollectionTest, withInitialData) {
ASSERT_EQUALS(0, request.cmdObj["maxSplitPoints"].numberLong());
ASSERT_EQUALS(0, request.cmdObj["maxChunkObjects"].numberLong());
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ok" << 1 << "splitKeys"
<< BSON_ARRAY(splitPoint0 << splitPoint1 << splitPoint2 << splitPoint3));
});
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp
index e08996461f5..8d295a29e6c 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/catalog/replset/catalog_manager_replica_set.h"
#include "mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h"
@@ -64,6 +65,7 @@ using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using executor::TaskExecutor;
+using rpc::ReplSetMetadata;
using std::string;
using std::vector;
using stdx::chrono::milliseconds;
@@ -482,8 +484,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommand) {
auto future = launchAsync([this] {
BSONObjBuilder responseBuilder;
- bool ok =
- catalogManager()->runReadCommand("test", BSON("usersInfo" << 1), &responseBuilder);
+ bool ok = catalogManager()->runUserManagementReadCommand(
+ "test", BSON("usersInfo" << 1), &responseBuilder);
ASSERT_TRUE(ok);
BSONObj response = responseBuilder.obj();
@@ -493,6 +495,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommand) {
});
onCommand([](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
ASSERT_EQUALS("test", request.dbname);
ASSERT_EQUALS(BSON("usersInfo" << 1), request.cmdObj);
@@ -508,7 +512,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommandUnsatisfiedReadPre
Status(ErrorCodes::FailedToSatisfyReadPreference, "no nodes up"));
BSONObjBuilder responseBuilder;
- bool ok = catalogManager()->runReadCommand("test", BSON("usersInfo" << 1), &responseBuilder);
+ bool ok = catalogManager()->runUserManagementReadCommand(
+ "test", BSON("usersInfo" << 1), &responseBuilder);
ASSERT_FALSE(ok);
Status commandStatus = Command::getStatusFromCommandResult(responseBuilder.obj());
@@ -571,6 +576,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandSuccess) {
<< "test"),
request.cmdObj);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(responseBuilder,
Status(ErrorCodes::UserNotFound, "User test@test not found"));
@@ -669,6 +676,8 @@ TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandNotMasterRetrySuc
<< "test"),
request.cmdObj);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
return BSON("ok" << 1);
});
@@ -1212,6 +1221,8 @@ TEST_F(CatalogManagerReplSetTest, UpdateDatabase) {
onCommand([dbt](const RemoteCommandRequest& request) {
ASSERT_EQUALS("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedUpdateRequest actualBatchedUpdate;
std::string errmsg;
ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
@@ -1284,6 +1295,9 @@ TEST_F(CatalogManagerReplSetTest, ApplyChunkOpsDeprecated) {
onCommand([updateOps, preCondition](const RemoteCommandRequest& request) {
ASSERT_EQUALS("config", request.dbname);
+
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
ASSERT_EQUALS(updateOps, request.cmdObj["applyOps"].Obj());
ASSERT_EQUALS(preCondition, request.cmdObj["preCondition"].Obj());
@@ -1316,6 +1330,8 @@ TEST_F(CatalogManagerReplSetTest, ApplyChunkOpsDeprecatedCommandFailed) {
ASSERT_EQUALS(updateOps, request.cmdObj["applyOps"].Obj());
ASSERT_EQUALS(preCondition, request.cmdObj["preCondition"].Obj());
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(responseBuilder,
Status(ErrorCodes::BadValue, "precondition failed"));
@@ -1399,6 +1415,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ok" << 1 << "totalSize" << 10);
});
@@ -1409,6 +1427,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ok" << 1 << "totalSize" << 1);
});
@@ -1419,6 +1439,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ok" << 1 << "totalSize" << 100);
});
@@ -1427,6 +1449,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedInsertRequest actualBatchedInsert;
std::string errmsg;
ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
@@ -1657,6 +1681,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ok" << 1 << "totalSize" << 10);
});
@@ -1667,6 +1693,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ok" << 1 << "totalSize" << 1);
});
@@ -1677,6 +1705,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return BSON("ok" << 1 << "totalSize" << 100);
});
@@ -1685,6 +1715,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedInsertRequest actualBatchedInsert;
std::string errmsg;
ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
@@ -1765,6 +1797,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) {
ASSERT_EQ("admin", request.dbname);
ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
+
return fromjson(R"({
databases: [],
totalSize: 1,
@@ -1776,6 +1810,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) {
ASSERT_EQ(HostAndPort("config:123"), request.target);
ASSERT_EQ("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BSONObj expectedCmd(fromjson(R"({
update: "databases",
updates: [{
@@ -1871,6 +1907,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingDBExists) {
ASSERT_EQ(HostAndPort("config:123"), request.target);
ASSERT_EQ("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BSONObj expectedCmd(fromjson(R"({
update: "databases",
updates: [{
@@ -1948,5 +1986,128 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExistsNoShards) {
future.timed_get(kFutureTimeout);
}
+TEST_F(CatalogManagerReplSetTest, BasicReadAfterOpTime) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ repl::OpTime lastOpTime;
+ for (int x = 0; x < 3; x++) {
+ auto future = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(
+ catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ const repl::OpTime newOpTime(Timestamp(x + 2, x + 6), x + 5);
+
+ onCommandWithMetadata([this, &newOpTime, &lastOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, lastOpTime.getTimestamp(), lastOpTime.getTerm());
+
+ ReplSetMetadata metadata(12, newOpTime, 100, 3);
+ BSONObjBuilder builder;
+ BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName));
+ metadata.writeToMetadata(&replBuilder);
+ replBuilder.done();
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ // Now wait for the runReadCommand call to return
+ future.timed_get(kFutureTimeout);
+
+ lastOpTime = newOpTime;
+ }
+}
+
+TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ // Initialize the internal config OpTime
+ auto future1 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ repl::OpTime highestOpTime;
+ const repl::OpTime newOpTime(Timestamp(7, 6), 5);
+
+ onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(12, newOpTime, 100, 3);
+ BSONObjBuilder builder;
+ BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName));
+ metadata.writeToMetadata(&replBuilder);
+ replBuilder.done();
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ future1.timed_get(kFutureTimeout);
+
+ highestOpTime = newOpTime;
+
+ // Return an older OpTime
+ auto future2 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ const repl::OpTime oldOpTime(Timestamp(3, 10), 5);
+
+ onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(12, oldOpTime, 100, 3);
+ BSONObjBuilder builder;
+ BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName));
+ metadata.writeToMetadata(&replBuilder);
+ replBuilder.done();
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ future2.timed_get(kFutureTimeout);
+
+ // Check that older OpTime does not override highest OpTime
+ auto future3 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(12, oldOpTime, 100, 3);
+ BSONObjBuilder builder;
+ BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName));
+ metadata.writeToMetadata(&replBuilder);
+ replBuilder.done();
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ future3.timed_get(kFutureTimeout);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp
index 767555bcc13..3b0036418a5 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/lite_parsed_query.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
@@ -179,6 +180,11 @@ void CatalogManagerReplSetTestFixture::onCommand(NetworkTestEnv::OnCommandFuncti
_networkTestEnv->onCommand(func);
}
+void CatalogManagerReplSetTestFixture::onCommandWithMetadata(
+ NetworkTestEnv::OnCommandWithMetadataFunction func) {
+ _networkTestEnv->onCommandWithMetadata(func);
+}
+
void CatalogManagerReplSetTestFixture::onFindCommand(NetworkTestEnv::OnFindCommandFunction func) {
_networkTestEnv->onFindCommand(func);
}
@@ -312,6 +318,7 @@ void CatalogManagerReplSetTestFixture::expectUpdateCollection(const HostAndPort&
const CollectionType& coll) {
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(expectedHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
ASSERT_EQUALS("config", request.dbname);
BatchedUpdateRequest actualBatchedUpdate;
@@ -342,6 +349,7 @@ void CatalogManagerReplSetTestFixture::expectSetShardVersion(
const ChunkVersion& expectedChunkVersion) {
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(expectedHost, request.target);
+ ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
SetShardVersionRequest ssv =
assertGet(SetShardVersionRequest::parseFromBSON(request.cmdObj));
@@ -379,10 +387,32 @@ void CatalogManagerReplSetTestFixture::expectCount(const HostAndPort& configHost
return BSON("ok" << 1 << "n" << response.getValue());
}
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(responseBuilder, response.getStatus());
return responseBuilder.obj();
});
}
+void CatalogManagerReplSetTestFixture::checkReadConcern(const BSONObj& cmdObj,
+ const Timestamp& expectedTS,
+ long long expectedTerm) const {
+ auto readConcernElem = cmdObj[repl::ReadConcernArgs::kReadConcernFieldName];
+ ASSERT_EQ(Object, readConcernElem.type());
+
+ auto readConcernObj = readConcernElem.Obj();
+ ASSERT_EQ("majority", readConcernObj[repl::ReadConcernArgs::kLevelFieldName].str());
+
+ auto afterElem = readConcernObj[repl::ReadConcernArgs::kOpTimeFieldName];
+ ASSERT_EQ(Object, afterElem.type());
+
+ auto afterObj = afterElem.Obj();
+
+ ASSERT_TRUE(afterObj.hasField(repl::ReadConcernArgs::kOpTimestampFieldName));
+ ASSERT_EQ(expectedTS, afterObj[repl::ReadConcernArgs::kOpTimestampFieldName].timestamp());
+ ASSERT_TRUE(afterObj.hasField(repl::ReadConcernArgs::kOpTermFieldName));
+ ASSERT_EQ(expectedTerm, afterObj[repl::ReadConcernArgs::kOpTermFieldName].numberLong());
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h
index 5a1fce08408..67d8d678637 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h
@@ -94,6 +94,7 @@ protected:
* single request + response or find tests.
*/
void onCommand(executor::NetworkTestEnv::OnCommandFunction func);
+ void onCommandWithMetadata(executor::NetworkTestEnv::OnCommandWithMetadataFunction func);
void onFindCommand(executor::NetworkTestEnv::OnFindCommandFunction func);
/**
@@ -157,6 +158,13 @@ protected:
void shutdownExecutor();
+ /**
+ * Checks that the given command has the expected settings for read after opTime.
+ */
+ void checkReadConcern(const BSONObj& cmdObj,
+ const Timestamp& expectedTS,
+ long long expectedTerm) const;
+
private:
std::unique_ptr<ServiceContext> _service;
ServiceContext::UniqueClient _client;
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp
index 51afc7b40b3..c20e98a4ef9 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp
@@ -151,6 +151,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfig) {
ASSERT_EQ("admin", request.dbname);
ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
return fromjson(R"({
databases: [
{ name: "local" }
@@ -164,6 +166,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfig) {
ASSERT_EQ(HostAndPort("config:123"), request.target);
ASSERT_EQ("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedUpdateRequest actualBatchedUpdate;
std::string errmsg;
ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
@@ -210,6 +214,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfigWithAdmin
ASSERT_EQ("admin", request.dbname);
ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
return fromjson(R"({
databases: [
{ name: "local" },
@@ -224,6 +230,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocEmptyConfigWithAdmin
ASSERT_EQ(HostAndPort("config:123"), request.target);
ASSERT_EQ("config", request.dbname);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
BatchedUpdateRequest actualBatchedUpdate;
std::string errmsg;
ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
@@ -255,6 +263,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeWriteError) {
ASSERT_EQ("admin", request.dbname);
ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
return fromjson(R"({
databases: [
{ name: "local" }
@@ -296,6 +306,8 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNoVersionDocNonEmptyConfigServer
ASSERT_EQ("admin", request.dbname);
ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
return fromjson(R"({
databases: [
{ name: "local" },
diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript
index d99b3c99995..d51711e66f3 100644
--- a/src/mongo/s/client/SConscript
+++ b/src/mongo/s/client/SConscript
@@ -15,6 +15,7 @@ env.Library(
'$BUILD_DIR/mongo/client/fetcher',
'$BUILD_DIR/mongo/client/remote_command_runner_impl',
'$BUILD_DIR/mongo/client/remote_command_targeter',
+ '$BUILD_DIR/mongo/rpc/metadata',
]
)
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index f44b800a989..90866b677b9 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -39,6 +39,7 @@
#include "mongo/client/replica_set_monitor.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard.h"
@@ -322,10 +323,24 @@ StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort
StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host,
const std::string& dbName,
const BSONObj& cmdObj) {
+ auto status = runCommandWithMetadata(host, dbName, cmdObj, rpc::makeEmptyMetadata());
+
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+
+ return status.getValue().response;
+}
+
+StatusWith<ShardRegistry::CommandResponse> ShardRegistry::runCommandWithMetadata(
+ const HostAndPort& host,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ const BSONObj& metadata) {
StatusWith<executor::RemoteCommandResponse> responseStatus =
Status(ErrorCodes::InternalError, "Internal error running command");
- executor::RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout);
+ executor::RemoteCommandRequest request(host, dbName, cmdObj, metadata, kConfigCommandTimeout);
auto callStatus =
_executor->scheduleRemoteCommand(request,
[&responseStatus](const RemoteCommandCallbackArgs& args) {
@@ -342,12 +357,42 @@ StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host,
return responseStatus.getStatus();
}
- return responseStatus.getValue().data;
+ auto response = responseStatus.getValue();
+
+ CommandResponse cmdResponse;
+ cmdResponse.response = response.data;
+
+ if (auto replField = response.metadata[rpc::kReplicationMetadataFieldName]) {
+ auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(replField.Obj());
+
+ if (!replParseStatus.isOK()) {
+ return replParseStatus.getStatus();
+ }
+
+ // TODO: SERVER-19734 use config server snapshot time.
+ cmdResponse.opTime = replParseStatus.getValue().getLastCommittedOptime();
+ }
+
+ return cmdResponse;
}
StatusWith<BSONObj> ShardRegistry::runCommandWithNotMasterRetries(const ShardId& shardId,
const std::string& dbname,
const BSONObj& cmdObj) {
+ auto status = runCommandWithNotMasterRetries(shardId, dbname, cmdObj, rpc::makeEmptyMetadata());
+
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+
+ return status.getValue().response;
+}
+
+StatusWith<ShardRegistry::CommandResponse> ShardRegistry::runCommandWithNotMasterRetries(
+ const ShardId& shardId,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ const BSONObj& metadata) {
auto targeter = getShard(shardId)->getTargeter();
const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet{});
@@ -365,12 +410,12 @@ StatusWith<BSONObj> ShardRegistry::runCommandWithNotMasterRetries(const ShardId&
return target.getStatus();
}
- auto response = runCommand(target.getValue(), dbname, cmdObj);
+ auto response = runCommandWithMetadata(target.getValue(), dbname, cmdObj, metadata);
if (!response.isOK()) {
return response.getStatus();
}
- Status commandStatus = getStatusFromCommandResult(response.getValue());
+ Status commandStatus = getStatusFromCommandResult(response.getValue().response);
if (ErrorCodes::NotMaster == commandStatus ||
ErrorCodes::NotMasterNoSlaveOkCode == commandStatus) {
targeter->markHostNotMaster(target.getValue());
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index 6f5fc76ea4c..634bcdf2340 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -34,6 +34,8 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/s/client/shard.h"
#include "mongo/stdx/mutex.h"
@@ -65,6 +67,11 @@ class ShardRegistry {
MONGO_DISALLOW_COPYING(ShardRegistry);
public:
+ struct CommandResponse {
+ BSONObj response;
+ repl::OpTime opTime;
+ };
+
/**
* Instantiates a new shard registry.
*
@@ -153,6 +160,14 @@ public:
* Runs a command against the specified host and returns the result. It is the responsibility
* of the caller to check the returned BSON for command-specific failures.
*/
+ StatusWith<CommandResponse> runCommandWithMetadata(const HostAndPort& host,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ const BSONObj& metadata);
+
+ /**
+ * Runs a command against the specified host and returns the result.
+ */
StatusWith<BSONObj> runCommand(const HostAndPort& host,
const std::string& dbName,
const BSONObj& cmdObj);
@@ -172,6 +187,11 @@ public:
const std::string& dbname,
const BSONObj& cmdObj);
+ StatusWith<CommandResponse> runCommandWithNotMasterRetries(const ShardId& shardId,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ const BSONObj& metadata);
+
private:
typedef std::map<ShardId, std::shared_ptr<Shard>> ShardMap;
diff --git a/src/mongo/s/commands/cluster_user_management_commands.cpp b/src/mongo/s/commands/cluster_user_management_commands.cpp
index 0bb67e31c2a..d2a6ab667d4 100644
--- a/src/mongo/s/commands/cluster_user_management_commands.cpp
+++ b/src/mongo/s/commands/cluster_user_management_commands.cpp
@@ -352,7 +352,7 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- return grid.catalogManager()->runReadCommand(dbname, cmdObj, &result);
+ return grid.catalogManager()->runUserManagementReadCommand(dbname, cmdObj, &result);
}
} cmdUsersInfo;
@@ -710,7 +710,7 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- return grid.catalogManager()->runReadCommand(dbname, cmdObj, &result);
+ return grid.catalogManager()->runUserManagementReadCommand(dbname, cmdObj, &result);
}
} cmdRolesInfo;
diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js
index baf0c58a270..90ad37bb223 100755
--- a/src/mongo/shell/servers.js
+++ b/src/mongo/shell/servers.js
@@ -828,6 +828,9 @@ function appendSetParameterArgs(argArray) {
});
}
}
+ if (argArray.indexOf('--configsvr') > 0 && argArray.indexOf('--replSet') > 0) {
+ argArray.push.apply(argArray, ['--setParameter', "enableReplSnapshotThread=1"]);
+ }
}
}
return argArray;