summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2015-08-06 13:26:55 -0400
committerSpencer T Brody <spencer@mongodb.com>2015-08-12 11:27:43 -0400
commitbe7db282c80c981882ea67f909eb6be4e53d2d4b (patch)
tree615bcfdcc2e25e5db24b4d82f3db7cff1f9c4f91 /src/mongo
parent86a3e6352eb27fd2e6115299bcec5103a830fe36 (diff)
downloadmongo-be7db282c80c981882ea67f909eb6be4e53d2d4b.tar.gz
SERVER-19543 Thread OperationContext through to everywhere that accesses the CatalogManager
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/client/parallel.cpp35
-rw-r--r--src/mongo/client/parallel.h14
-rw-r--r--src/mongo/db/auth/authorization_manager.cpp10
-rw-r--r--src/mongo/db/auth/authorization_manager.h8
-rw-r--r--src/mongo/db/auth/authz_manager_external_state.h6
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_local.cpp6
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_local.h6
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_s.cpp16
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_s.h6
-rw-r--r--src/mongo/db/auth/user_cache_invalidator_job.cpp13
-rw-r--r--src/mongo/db/auth/user_cache_invalidator_job.h5
-rw-r--r--src/mongo/db/commands.h4
-rw-r--r--src/mongo/db/commands/mr.cpp6
-rw-r--r--src/mongo/db/commands/user_management_commands.cpp35
-rw-r--r--src/mongo/db/instance.cpp11
-rw-r--r--src/mongo/db/operation_context_noop.h13
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp2
-rw-r--r--src/mongo/db/s/set_shard_version_command.cpp6
-rw-r--r--src/mongo/db/s/sharding_state.cpp10
-rw-r--r--src/mongo/db/s/sharding_state.h4
-rw-r--r--src/mongo/db/storage/record_store_test_harness.h2
-rw-r--r--src/mongo/db/storage/record_store_test_touch.cpp12
-rw-r--r--src/mongo/dbtests/chunk_manager_tests.cpp6
-rw-r--r--src/mongo/dbtests/config_upgrade_tests.cpp18
-rw-r--r--src/mongo/dbtests/framework.cpp12
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/balance.cpp90
-rw-r--r--src/mongo/s/balance.h9
-rw-r--r--src/mongo/s/balancer_policy.cpp4
-rw-r--r--src/mongo/s/balancer_policy.h3
-rw-r--r--src/mongo/s/catalog/catalog_cache.cpp5
-rw-r--r--src/mongo/s/catalog/catalog_cache.h4
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp8
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp8
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp2
-rw-r--r--src/mongo/s/chunk.cpp65
-rw-r--r--src/mongo/s/chunk.h19
-rw-r--r--src/mongo/s/chunk_manager.cpp24
-rw-r--r--src/mongo/s/chunk_manager.h14
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp45
-rw-r--r--src/mongo/s/chunk_manager_targeter.h18
-rw-r--r--src/mongo/s/client/shard_connection.cpp13
-rw-r--r--src/mongo/s/client/shard_connection.h2
-rw-r--r--src/mongo/s/cluster_write.cpp24
-rw-r--r--src/mongo/s/cluster_write.h8
-rw-r--r--src/mongo/s/commands/cluster_add_shard_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_commands_common.cpp6
-rw-r--r--src/mongo/s/commands/cluster_commands_common.h2
-rw-r--r--src/mongo/s/commands/cluster_count_cmd.cpp6
-rw-r--r--src/mongo/s/commands/cluster_drop_database_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_enable_sharding_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp20
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_get_shard_version_cmd.cpp4
-rw-r--r--src/mongo/s/commands/cluster_index_filter_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_list_databases_cmd.cpp5
-rw-r--r--src/mongo/s/commands/cluster_list_shards_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp20
-rw-r--r--src/mongo/s/commands/cluster_merge_chunks_cmd.cpp8
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp13
-rw-r--r--src/mongo/s/commands/cluster_move_primary_cmd.cpp11
-rw-r--r--src/mongo/s/commands/cluster_netstat_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp7
-rw-r--r--src/mongo/s/commands/cluster_plan_cache_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_remove_shard_cmd.cpp13
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp20
-rw-r--r--src/mongo/s/commands/cluster_split_collection_cmd.cpp16
-rw-r--r--src/mongo/s/commands/cluster_user_management_commands.cpp64
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp15
-rw-r--r--src/mongo/s/commands/commands_public.cpp62
-rw-r--r--src/mongo/s/commands/run_on_all_shards_cmd.cpp9
-rw-r--r--src/mongo/s/commands/run_on_all_shards_cmd.h3
-rw-r--r--src/mongo/s/config.cpp130
-rw-r--r--src/mongo/s/config.h28
-rw-r--r--src/mongo/s/d_merge.cpp17
-rw-r--r--src/mongo/s/d_migrate.cpp24
-rw-r--r--src/mongo/s/d_split.cpp12
-rw-r--r--src/mongo/s/d_state.cpp4
-rw-r--r--src/mongo/s/grid.cpp14
-rw-r--r--src/mongo/s/grid.h9
-rw-r--r--src/mongo/s/mock_ns_targeter.h10
-rw-r--r--src/mongo/s/ns_targeter.h13
-rw-r--r--src/mongo/s/query/cluster_find.cpp4
-rw-r--r--src/mongo/s/request.cpp10
-rw-r--r--src/mongo/s/request.h3
-rw-r--r--src/mongo/s/s_only.cpp6
-rw-r--r--src/mongo/s/server.cpp54
-rw-r--r--src/mongo/s/strategy.cpp46
-rw-r--r--src/mongo/s/strategy.h18
-rw-r--r--src/mongo/s/version_manager.cpp39
-rw-r--r--src/mongo/s/version_manager.h7
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp8
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.h4
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp19
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp5
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h4
-rw-r--r--src/mongo/s/write_ops/batch_write_op_test.cpp121
-rw-r--r--src/mongo/s/write_ops/write_op.cpp9
-rw-r--r--src/mongo/s/write_ops/write_op.h4
-rw-r--r--src/mongo/s/write_ops/write_op_test.cpp19
100 files changed, 918 insertions, 676 deletions
diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp
index 4dc63472b90..b03dae3daeb 100644
--- a/src/mongo/client/parallel.cpp
+++ b/src/mongo/client/parallel.cpp
@@ -60,13 +60,13 @@ using std::vector;
LabeledLevel pc("pcursor", 2);
-void ParallelSortClusteredCursor::init() {
+void ParallelSortClusteredCursor::init(OperationContext* txn) {
if (_didInit)
return;
_didInit = true;
if (!_qSpec.isEmpty()) {
- fullInit();
+ fullInit(txn);
} else {
// You can only get here by using the legacy constructor
// TODO: Eliminate this
@@ -494,9 +494,9 @@ string ParallelSortClusteredCursor::toString() const {
return str::stream() << "PCursor : " << toBSON();
}
-void ParallelSortClusteredCursor::fullInit() {
- startInit();
- finishInit();
+void ParallelSortClusteredCursor::fullInit(OperationContext* txn) {
+ startInit(txn);
+ finishInit(txn);
}
void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS,
@@ -520,10 +520,11 @@ void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS,
forceReload = tries > 2;
}
-void ParallelSortClusteredCursor::_handleStaleNS(const NamespaceString& staleNS,
+void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* txn,
+ const NamespaceString& staleNS,
bool forceReload,
bool fullReload) {
- auto status = grid.catalogCache()->getDatabase(staleNS.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, staleNS.db().toString());
if (!status.isOK()) {
warning() << "cannot reload database info for stale namespace " << staleNS.ns();
return;
@@ -532,7 +533,7 @@ void ParallelSortClusteredCursor::_handleStaleNS(const NamespaceString& staleNS,
shared_ptr<DBConfig> config = status.getValue();
// Reload db if needed, make sure it works
- if (fullReload && !config->reload()) {
+ if (fullReload && !config->reload(txn)) {
// We didn't find the db after reload, the db may have been dropped, reset this ptr
config.reset();
}
@@ -541,7 +542,7 @@ void ParallelSortClusteredCursor::_handleStaleNS(const NamespaceString& staleNS,
warning() << "cannot reload database info for stale namespace " << staleNS.ns();
} else {
// Reload chunk manager, potentially forcing the namespace
- config->getChunkManagerIfExists(staleNS.ns(), true, forceReload);
+ config->getChunkManagerIfExists(txn, staleNS.ns(), true, forceReload);
}
}
@@ -626,7 +627,7 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(PCStatePtr state,
}
}
-void ParallelSortClusteredCursor::startInit() {
+void ParallelSortClusteredCursor::startInit(OperationContext* txn) {
const bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
const NamespaceString nss(!_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns());
@@ -649,7 +650,7 @@ void ParallelSortClusteredCursor::startInit() {
{
shared_ptr<DBConfig> config;
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (status.getStatus().code() != ErrorCodes::DatabaseNotFound) {
config = uassertStatusOK(status);
config->getChunkManagerOrPrimary(nss.ns(), manager, primary);
@@ -850,10 +851,10 @@ void ParallelSortClusteredCursor::startInit() {
warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace "
<< staleNS;
- _handleStaleNS(staleNS, forceReload, fullReload);
+ _handleStaleNS(txn, staleNS, forceReload, fullReload);
// Restart with new chunk manager
- startInit();
+ startInit(txn);
return;
} catch (SocketException& e) {
warning() << "socket exception when initializing on " << shardId
@@ -924,7 +925,7 @@ void ParallelSortClusteredCursor::startInit() {
}
}
-void ParallelSortClusteredCursor::finishInit() {
+void ParallelSortClusteredCursor::finishInit(OperationContext* txn) {
bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
bool specialVersion = _cInfo.versionedNS.size() > 0;
string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns();
@@ -1070,13 +1071,13 @@ void ParallelSortClusteredCursor::finishInit() {
warning() << "versioned ns " << ns << " doesn't match stale config namespace "
<< staleNS;
- _handleStaleNS(staleNS, forceReload, fullReload);
+ _handleStaleNS(txn, staleNS, forceReload, fullReload);
}
}
// Re-establish connections we need to
- startInit();
- finishInit();
+ startInit(txn);
+ finishInit(txn);
return;
}
diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h
index 92132e182a1..ca6faa8644d 100644
--- a/src/mongo/client/parallel.h
+++ b/src/mongo/client/parallel.h
@@ -39,6 +39,7 @@
namespace mongo {
class DBClientCursorHolder;
+class OperationContext;
class StaleConfigException;
class ParallelConnectionMetadata;
@@ -150,7 +151,7 @@ public:
std::string getNS();
/** call before using */
- void init();
+ void init(OperationContext* txn);
bool more();
BSONObj next();
@@ -158,9 +159,9 @@ public:
return "ParallelSort";
}
- void fullInit();
- void startInit();
- void finishInit();
+ void fullInit(OperationContext* txn);
+ void startInit(OperationContext* txn);
+ void finishInit(OperationContext* txn);
bool isCommand() {
return NamespaceString(_qSpec.ns()).isCommand();
@@ -217,7 +218,10 @@ private:
const StaleConfigException& e,
bool& forceReload,
bool& fullReload);
- void _handleStaleNS(const NamespaceString& staleNS, bool forceReload, bool fullReload);
+ void _handleStaleNS(OperationContext* txn,
+ const NamespaceString& staleNS,
+ bool forceReload,
+ bool fullReload);
bool _didInit;
bool _done;
diff --git a/src/mongo/db/auth/authorization_manager.cpp b/src/mongo/db/auth/authorization_manager.cpp
index 4430c43a5ab..37136f69114 100644
--- a/src/mongo/db/auth/authorization_manager.cpp
+++ b/src/mongo/db/auth/authorization_manager.cpp
@@ -413,18 +413,20 @@ Status AuthorizationManager::getUserDescription(OperationContext* txn,
return _externalState->getUserDescription(txn, userName, result);
}
-Status AuthorizationManager::getRoleDescription(const RoleName& roleName,
+Status AuthorizationManager::getRoleDescription(OperationContext* txn,
+ const RoleName& roleName,
bool showPrivileges,
BSONObj* result) {
- return _externalState->getRoleDescription(roleName, showPrivileges, result);
+ return _externalState->getRoleDescription(txn, roleName, showPrivileges, result);
}
-Status AuthorizationManager::getRoleDescriptionsForDB(const std::string dbname,
+Status AuthorizationManager::getRoleDescriptionsForDB(OperationContext* txn,
+ const std::string dbname,
bool showPrivileges,
bool showBuiltinRoles,
vector<BSONObj>* result) {
return _externalState->getRoleDescriptionsForDB(
- dbname, showPrivileges, showBuiltinRoles, result);
+ txn, dbname, showPrivileges, showBuiltinRoles, result);
}
Status AuthorizationManager::acquireUser(OperationContext* txn,
diff --git a/src/mongo/db/auth/authorization_manager.h b/src/mongo/db/auth/authorization_manager.h
index 9c7fdbaf9d0..995207723c8 100644
--- a/src/mongo/db/auth/authorization_manager.h
+++ b/src/mongo/db/auth/authorization_manager.h
@@ -232,7 +232,10 @@ public:
*
* If the role does not exist, returns ErrorCodes::RoleNotFound.
*/
- Status getRoleDescription(const RoleName& roleName, bool showPrivileges, BSONObj* result);
+ Status getRoleDescription(OperationContext* txn,
+ const RoleName& roleName,
+ bool showPrivileges,
+ BSONObj* result);
/**
* Writes into "result" documents describing the roles that are defined on the given
@@ -247,7 +250,8 @@ public:
* the document will contain a "warnings" array, with std::string messages describing
* inconsistencies.
*/
- Status getRoleDescriptionsForDB(const std::string dbname,
+ Status getRoleDescriptionsForDB(OperationContext* txn,
+ const std::string dbname,
bool showPrivileges,
bool showBuiltinRoles,
std::vector<BSONObj>* result);
diff --git a/src/mongo/db/auth/authz_manager_external_state.h b/src/mongo/db/auth/authz_manager_external_state.h
index e3bdcdb8c43..0efac6ec80b 100644
--- a/src/mongo/db/auth/authz_manager_external_state.h
+++ b/src/mongo/db/auth/authz_manager_external_state.h
@@ -103,7 +103,8 @@ public:
*
* If the role does not exist, returns ErrorCodes::RoleNotFound.
*/
- virtual Status getRoleDescription(const RoleName& roleName,
+ virtual Status getRoleDescription(OperationContext* txn,
+ const RoleName& roleName,
bool showPrivileges,
BSONObj* result) = 0;
@@ -120,7 +121,8 @@ public:
* the document will contain a "warnings" array, with std::string messages describing
* inconsistencies.
*/
- virtual Status getRoleDescriptionsForDB(const std::string dbname,
+ virtual Status getRoleDescriptionsForDB(OperationContext* txn,
+ const std::string dbname,
bool showPrivileges,
bool showBuiltinRoles,
std::vector<BSONObj>* result) = 0;
diff --git a/src/mongo/db/auth/authz_manager_external_state_local.cpp b/src/mongo/db/auth/authz_manager_external_state_local.cpp
index 7f410b605af..449c1e9dbfe 100644
--- a/src/mongo/db/auth/authz_manager_external_state_local.cpp
+++ b/src/mongo/db/auth/authz_manager_external_state_local.cpp
@@ -227,7 +227,8 @@ Status AuthzManagerExternalStateLocal::_getUserDocument(OperationContext* txn,
return status;
}
-Status AuthzManagerExternalStateLocal::getRoleDescription(const RoleName& roleName,
+Status AuthzManagerExternalStateLocal::getRoleDescription(OperationContext* txn,
+ const RoleName& roleName,
bool showPrivileges,
BSONObj* result) {
stdx::lock_guard<stdx::mutex> lk(_roleGraphMutex);
@@ -286,7 +287,8 @@ Status AuthzManagerExternalStateLocal::_getRoleDescription_inlock(const RoleName
return Status::OK();
}
-Status AuthzManagerExternalStateLocal::getRoleDescriptionsForDB(const std::string dbname,
+Status AuthzManagerExternalStateLocal::getRoleDescriptionsForDB(OperationContext* txn,
+ const std::string dbname,
bool showPrivileges,
bool showBuiltinRoles,
vector<BSONObj>* result) {
diff --git a/src/mongo/db/auth/authz_manager_external_state_local.h b/src/mongo/db/auth/authz_manager_external_state_local.h
index f761279d03a..536404dcd51 100644
--- a/src/mongo/db/auth/authz_manager_external_state_local.h
+++ b/src/mongo/db/auth/authz_manager_external_state_local.h
@@ -67,10 +67,12 @@ public:
virtual Status getUserDescription(OperationContext* txn,
const UserName& userName,
BSONObj* result);
- virtual Status getRoleDescription(const RoleName& roleName,
+ virtual Status getRoleDescription(OperationContext* txn,
+ const RoleName& roleName,
bool showPrivileges,
BSONObj* result);
- virtual Status getRoleDescriptionsForDB(const std::string dbname,
+ virtual Status getRoleDescriptionsForDB(OperationContext* txn,
+ const std::string dbname,
bool showPrivileges,
bool showBuiltinRoles,
std::vector<BSONObj>* result);
diff --git a/src/mongo/db/auth/authz_manager_external_state_s.cpp b/src/mongo/db/auth/authz_manager_external_state_s.cpp
index b71cf0cf08c..ecb2d014c4f 100644
--- a/src/mongo/db/auth/authz_manager_external_state_s.cpp
+++ b/src/mongo/db/auth/authz_manager_external_state_s.cpp
@@ -70,7 +70,7 @@ Status AuthzManagerExternalStateMongos::getStoredAuthorizationVersion(OperationC
BSONObj getParameterCmd = BSON("getParameter" << 1 << authSchemaVersionServerParameter << 1);
BSONObjBuilder builder;
const bool ok =
- grid.catalogManager()->runUserManagementReadCommand("admin", getParameterCmd, &builder);
+ grid.catalogManager(txn)->runUserManagementReadCommand("admin", getParameterCmd, &builder);
BSONObj cmdResult = builder.obj();
if (!ok) {
return Command::getStatusFromCommandResult(cmdResult);
@@ -96,7 +96,7 @@ Status AuthzManagerExternalStateMongos::getUserDescription(OperationContext* txn
<< "showCredentials" << true);
BSONObjBuilder builder;
const bool ok =
- grid.catalogManager()->runUserManagementReadCommand("admin", usersInfoCmd, &builder);
+ grid.catalogManager(txn)->runUserManagementReadCommand("admin", usersInfoCmd, &builder);
BSONObj cmdResult = builder.obj();
if (!ok) {
return Command::getStatusFromCommandResult(cmdResult);
@@ -116,7 +116,8 @@ Status AuthzManagerExternalStateMongos::getUserDescription(OperationContext* txn
return Status::OK();
}
-Status AuthzManagerExternalStateMongos::getRoleDescription(const RoleName& roleName,
+Status AuthzManagerExternalStateMongos::getRoleDescription(OperationContext* txn,
+ const RoleName& roleName,
bool showPrivileges,
BSONObj* result) {
BSONObj rolesInfoCmd =
@@ -126,7 +127,7 @@ Status AuthzManagerExternalStateMongos::getRoleDescription(const RoleName& roleN
<< roleName.getDB())) << "showPrivileges" << showPrivileges);
BSONObjBuilder builder;
const bool ok =
- grid.catalogManager()->runUserManagementReadCommand("admin", rolesInfoCmd, &builder);
+ grid.catalogManager(txn)->runUserManagementReadCommand("admin", rolesInfoCmd, &builder);
BSONObj cmdResult = builder.obj();
if (!ok) {
return Command::getStatusFromCommandResult(cmdResult);
@@ -146,7 +147,8 @@ Status AuthzManagerExternalStateMongos::getRoleDescription(const RoleName& roleN
return Status::OK();
}
-Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(const std::string dbname,
+Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(OperationContext* txn,
+ const std::string dbname,
bool showPrivileges,
bool showBuiltinRoles,
std::vector<BSONObj>* result) {
@@ -154,7 +156,7 @@ Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(const std::stri
<< "showBuiltinRoles" << showBuiltinRoles);
BSONObjBuilder builder;
const bool ok =
- grid.catalogManager()->runUserManagementReadCommand(dbname, rolesInfoCmd, &builder);
+ grid.catalogManager(txn)->runUserManagementReadCommand(dbname, rolesInfoCmd, &builder);
BSONObj cmdResult = builder.obj();
if (!ok) {
return Command::getStatusFromCommandResult(cmdResult);
@@ -169,7 +171,7 @@ bool AuthzManagerExternalStateMongos::hasAnyPrivilegeDocuments(OperationContext*
BSONObj usersInfoCmd = BSON("usersInfo" << 1);
BSONObjBuilder builder;
const bool ok =
- grid.catalogManager()->runUserManagementReadCommand("admin", usersInfoCmd, &builder);
+ grid.catalogManager(txn)->runUserManagementReadCommand("admin", usersInfoCmd, &builder);
if (!ok) {
// If we were unable to complete the query,
// it's best to assume that there _are_ privilege documents. This might happen
diff --git a/src/mongo/db/auth/authz_manager_external_state_s.h b/src/mongo/db/auth/authz_manager_external_state_s.h
index 8de98a53552..bf9ca876c43 100644
--- a/src/mongo/db/auth/authz_manager_external_state_s.h
+++ b/src/mongo/db/auth/authz_manager_external_state_s.h
@@ -57,10 +57,12 @@ public:
virtual Status getUserDescription(OperationContext* txn,
const UserName& userName,
BSONObj* result);
- virtual Status getRoleDescription(const RoleName& roleName,
+ virtual Status getRoleDescription(OperationContext* txn,
+ const RoleName& roleName,
bool showPrivileges,
BSONObj* result);
- virtual Status getRoleDescriptionsForDB(const std::string dbname,
+ virtual Status getRoleDescriptionsForDB(OperationContext* txn,
+ const std::string dbname,
bool showPrivileges,
bool showBuiltinRoles,
std::vector<BSONObj>* result);
diff --git a/src/mongo/db/auth/user_cache_invalidator_job.cpp b/src/mongo/db/auth/user_cache_invalidator_job.cpp
index dd44b200f60..8ad27ca4484 100644
--- a/src/mongo/db/auth/user_cache_invalidator_job.cpp
+++ b/src/mongo/db/auth/user_cache_invalidator_job.cpp
@@ -88,10 +88,10 @@ public:
} exportedIntervalParam;
-StatusWith<OID> getCurrentCacheGeneration() {
+StatusWith<OID> getCurrentCacheGeneration(OperationContext* txn) {
try {
BSONObjBuilder result;
- const bool ok = grid.catalogManager()->runUserManagementReadCommand(
+ const bool ok = grid.catalogManager(txn)->runUserManagementReadCommand(
"admin", BSON("_getUserCacheGeneration" << 1), &result);
if (!ok) {
return Command::getStatusFromCommandResult(result.obj());
@@ -107,8 +107,10 @@ StatusWith<OID> getCurrentCacheGeneration() {
} // namespace
UserCacheInvalidator::UserCacheInvalidator(AuthorizationManager* authzManager)
- : _authzManager(authzManager) {
- StatusWith<OID> currentGeneration = getCurrentCacheGeneration();
+ : _authzManager(authzManager) {}
+
+void UserCacheInvalidator::initialize(OperationContext* txn) {
+ StatusWith<OID> currentGeneration = getCurrentCacheGeneration(txn);
if (currentGeneration.isOK()) {
_previousCacheGeneration = currentGeneration.getValue();
return;
@@ -145,7 +147,8 @@ void UserCacheInvalidator::run() {
break;
}
- StatusWith<OID> currentGeneration = getCurrentCacheGeneration();
+ auto txn = cc().makeOperationContext();
+ StatusWith<OID> currentGeneration = getCurrentCacheGeneration(txn.get());
if (!currentGeneration.isOK()) {
if (currentGeneration.getStatus().code() == ErrorCodes::CommandNotFound) {
warning() << "_getUserCacheGeneration command not found on config server(s), "
diff --git a/src/mongo/db/auth/user_cache_invalidator_job.h b/src/mongo/db/auth/user_cache_invalidator_job.h
index 3eb173b0a56..3b1c2544939 100644
--- a/src/mongo/db/auth/user_cache_invalidator_job.h
+++ b/src/mongo/db/auth/user_cache_invalidator_job.h
@@ -33,6 +33,7 @@
namespace mongo {
class AuthorizationManager;
+class OperationContext;
/**
* Background job that runs only in mongos and periodically checks in with the config servers
@@ -42,7 +43,9 @@ class AuthorizationManager;
*/
class UserCacheInvalidator : public BackgroundJob {
public:
- explicit UserCacheInvalidator(AuthorizationManager* authzManager);
+ UserCacheInvalidator(AuthorizationManager* authzManager);
+
+ void initialize(OperationContext* txn);
protected:
virtual std::string name() const;
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index 2f25a87719a..ea5ec9092ad 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -284,8 +284,8 @@ public:
// Counter for unknown commands
static Counter64 unknownCommands;
- /** @return if command was found */
- static void runAgainstRegistered(const char* ns,
+ static void runAgainstRegistered(OperationContext* txn,
+ const char* ns,
BSONObj& jsobj,
BSONObjBuilder& anObjBuilder,
int queryOptions = 0);
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 62b17b562f6..5529b5cc5a0 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -1627,7 +1627,7 @@ public:
result.append("result", config.outputOptions.collectionName);
}
- auto status = grid.catalogCache()->getDatabase(dbname);
+ auto status = grid.catalogCache()->getDatabase(txn, dbname);
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
@@ -1636,7 +1636,7 @@ public:
vector<ChunkPtr> chunks;
if (confOut->isSharded(config.outputOptions.finalNamespace)) {
- ChunkManagerPtr cm = confOut->getChunkManager(config.outputOptions.finalNamespace);
+ ChunkManagerPtr cm = confOut->getChunkManager(txn, config.outputOptions.finalNamespace);
// Fetch result from other shards 1 chunk at a time. It would be better to do
// just one big $or query, but then the sorting would not be efficient.
@@ -1670,7 +1670,7 @@ public:
BSONObj sortKey = BSON("_id" << 1);
ParallelSortClusteredCursor cursor(
servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout);
- cursor.init();
+ cursor.init(txn);
int chunkSize = 0;
while (cursor.more() || !values.empty()) {
diff --git a/src/mongo/db/commands/user_management_commands.cpp b/src/mongo/db/commands/user_management_commands.cpp
index 1645692daf4..b98ae7a5643 100644
--- a/src/mongo/db/commands/user_management_commands.cpp
+++ b/src/mongo/db/commands/user_management_commands.cpp
@@ -154,7 +154,8 @@ Status getCurrentUserRoles(OperationContext* txn,
* same database as the role it is being added to (or that the role being added to is from the
* "admin" database.
*/
-Status checkOkayToGrantRolesToRole(const RoleName& role,
+Status checkOkayToGrantRolesToRole(OperationContext* txn,
+ const RoleName& role,
const std::vector<RoleName> rolesToAdd,
AuthorizationManager* authzManager) {
for (std::vector<RoleName>::const_iterator it = rolesToAdd.begin(); it != rolesToAdd.end();
@@ -174,7 +175,7 @@ Status checkOkayToGrantRolesToRole(const RoleName& role,
}
BSONObj roleToAddDoc;
- Status status = authzManager->getRoleDescription(roleToAdd, false, &roleToAddDoc);
+ Status status = authzManager->getRoleDescription(txn, roleToAdd, false, &roleToAddDoc);
if (status == ErrorCodes::RoleNotFound) {
return Status(ErrorCodes::RoleNotFound,
"Cannot grant nonexistent role " + roleToAdd.toString());
@@ -725,7 +726,7 @@ public:
// Role existence has to be checked after acquiring the update lock
for (size_t i = 0; i < args.roles.size(); ++i) {
BSONObj ignored;
- status = authzManager->getRoleDescription(args.roles[i], false, &ignored);
+ status = authzManager->getRoleDescription(txn, args.roles[i], false, &ignored);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -842,7 +843,7 @@ public:
if (args.hasRoles) {
for (size_t i = 0; i < args.roles.size(); ++i) {
BSONObj ignored;
- status = authzManager->getRoleDescription(args.roles[i], false, &ignored);
+ status = authzManager->getRoleDescription(txn, args.roles[i], false, &ignored);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -1077,7 +1078,7 @@ public:
for (vector<RoleName>::iterator it = roles.begin(); it != roles.end(); ++it) {
RoleName& roleName = *it;
BSONObj roleDoc;
- status = authzManager->getRoleDescription(roleName, false, &roleDoc);
+ status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -1157,7 +1158,7 @@ public:
for (vector<RoleName>::iterator it = roles.begin(); it != roles.end(); ++it) {
RoleName& roleName = *it;
BSONObj roleDoc;
- status = authzManager->getRoleDescription(roleName, false, &roleDoc);
+ status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -1381,7 +1382,7 @@ public:
}
// Role existence has to be checked after acquiring the update lock
- status = checkOkayToGrantRolesToRole(args.roleName, args.roles, authzManager);
+ status = checkOkayToGrantRolesToRole(txn, args.roleName, args.roles, authzManager);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -1471,13 +1472,13 @@ public:
// Role existence has to be checked after acquiring the update lock
BSONObj ignored;
- status = authzManager->getRoleDescription(args.roleName, false, &ignored);
+ status = authzManager->getRoleDescription(txn, args.roleName, false, &ignored);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
if (args.hasRoles) {
- status = checkOkayToGrantRolesToRole(args.roleName, args.roles, authzManager);
+ status = checkOkayToGrantRolesToRole(txn, args.roleName, args.roles, authzManager);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -1568,7 +1569,7 @@ public:
}
BSONObj roleDoc;
- status = authzManager->getRoleDescription(roleName, true, &roleDoc);
+ status = authzManager->getRoleDescription(txn, roleName, true, &roleDoc);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -1680,7 +1681,7 @@ public:
}
BSONObj roleDoc;
- status = authzManager->getRoleDescription(roleName, true, &roleDoc);
+ status = authzManager->getRoleDescription(txn, roleName, true, &roleDoc);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -1798,13 +1799,13 @@ public:
// Role existence has to be checked after acquiring the update lock
BSONObj roleDoc;
- status = authzManager->getRoleDescription(roleName, false, &roleDoc);
+ status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
// Check for cycles
- status = checkOkayToGrantRolesToRole(roleName, rolesToAdd, authzManager);
+ status = checkOkayToGrantRolesToRole(txn, roleName, rolesToAdd, authzManager);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -1897,7 +1898,7 @@ public:
}
BSONObj roleDoc;
- status = authzManager->getRoleDescription(roleName, false, &roleDoc);
+ status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -1991,7 +1992,7 @@ public:
}
BSONObj roleDoc;
- status = authzManager->getRoleDescription(roleName, false, &roleDoc);
+ status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -2262,7 +2263,7 @@ public:
if (args.allForDB) {
std::vector<BSONObj> rolesDocs;
status = getGlobalAuthorizationManager()->getRoleDescriptionsForDB(
- dbname, args.showPrivileges, args.showBuiltinRoles, &rolesDocs);
+ txn, dbname, args.showPrivileges, args.showBuiltinRoles, &rolesDocs);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
@@ -2274,7 +2275,7 @@ public:
for (size_t i = 0; i < args.roleNames.size(); ++i) {
BSONObj roleDetails;
status = getGlobalAuthorizationManager()->getRoleDescription(
- args.roleNames[i], args.showPrivileges, &roleDetails);
+ txn, args.roleNames[i], args.showPrivileges, &roleDetails);
if (status.code() == ErrorCodes::RoleNotFound) {
continue;
}
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index 93638a041dd..f103b0b1bae 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -1159,7 +1159,16 @@ void exitCleanly(ExitCode code) {
getGlobalServiceContext()->setKillAllOperations();
repl::getGlobalReplicationCoordinator()->shutdown();
- auto catalogMgr = grid.catalogManager();
+
+ Client& client = cc();
+ ServiceContext::UniqueOperationContext uniqueTxn;
+ OperationContext* txn = client.getOperationContext();
+ if (!txn) {
+ uniqueTxn = client.makeOperationContext();
+ txn = uniqueTxn.get();
+ }
+
+ auto catalogMgr = grid.catalogManager(txn);
if (catalogMgr) {
catalogMgr->shutDown();
}
diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h
index 172ba5f5d5d..3d380605d58 100644
--- a/src/mongo/db/operation_context_noop.h
+++ b/src/mongo/db/operation_context_noop.h
@@ -54,9 +54,20 @@ public:
OperationContextNoop(Client* client, unsigned int opId, Locker* locker, RecoveryUnit* ru)
: OperationContext(client, opId, locker), _recoveryUnit(ru) {
_locker.reset(lockState());
+
+ if (client) {
+ stdx::lock_guard<Client> lk(*client);
+ client->setOperationContext(this);
+ }
}
- virtual ~OperationContextNoop() = default;
+ virtual ~OperationContextNoop() {
+ auto client = getClient();
+ if (client) {
+ stdx::lock_guard<Client> lk(*client);
+ client->resetOperationContext();
+ }
+ }
virtual RecoveryUnit* recoveryUnit() const override {
return _recoveryUnit.get();
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 8236d936c21..9ff279cdc45 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -955,7 +955,7 @@ MoveTimingHelper::~MoveTimingHelper() {
_b.append("errmsg", *_cmdErrmsg);
}
- grid.catalogManager()->logChange(
+ grid.catalogManager(_txn)->logChange(
_txn->getClient()->clientAddress(true), (string) "moveChunk." + _where, _ns, _b.obj());
} catch (const std::exception& e) {
warning() << "couldn't record timing for moveChunk '" << _where << "': " << e.what()
diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp
index 9f99cdce5a5..6010fa8a513 100644
--- a/src/mongo/db/s/set_shard_version_command.cpp
+++ b/src/mongo/db/s/set_shard_version_command.cpp
@@ -324,15 +324,15 @@ private:
}
if (ShardingState::get(txn)->enabled()) {
- if (configdb == ShardingState::get(txn)->getConfigServer())
+ if (configdb == ShardingState::get(txn)->getConfigServer(txn))
return true;
result.append("configdb",
- BSON("stored" << ShardingState::get(txn)->getConfigServer() << "given"
+ BSON("stored" << ShardingState::get(txn)->getConfigServer(txn) << "given"
<< configdb));
errmsg = str::stream() << "mongos specified a different config database string : "
- << "stored : " << ShardingState::get(txn)->getConfigServer()
+ << "stored : " << ShardingState::get(txn)->getConfigServer(txn)
<< " vs given : " << configdb;
return false;
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index cab256a66f6..4aee88c7307 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -84,11 +84,11 @@ bool ShardingState::enabled() {
return _enabled;
}
-string ShardingState::getConfigServer() {
+string ShardingState::getConfigServer(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_enabled);
- return grid.catalogManager()->connectionString().toString();
+ return grid.catalogManager(txn)->connectionString().toString();
}
string ShardingState::getShardName() {
@@ -545,7 +545,7 @@ Status ShardingState::doRefreshMetadata(OperationContext* txn,
shared_ptr<CollectionMetadata> remoteMetadata(remoteMetadataRaw);
Timer refreshTimer;
- Status status = mdLoader.makeCollectionMetadata(grid.catalogManager(),
+ Status status = mdLoader.makeCollectionMetadata(grid.catalogManager(txn),
ns,
getShardName(),
fullReload ? NULL : beforeMetadata.get(),
@@ -738,7 +738,7 @@ Status ShardingState::doRefreshMetadata(OperationContext* txn,
return Status::OK();
}
-void ShardingState::appendInfo(BSONObjBuilder& builder) {
+void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
builder.appendBool("enabled", _enabled);
@@ -746,7 +746,7 @@ void ShardingState::appendInfo(BSONObjBuilder& builder) {
return;
}
- builder.append("configServer", grid.catalogManager()->connectionString().toString());
+ builder.append("configServer", grid.catalogManager(txn)->connectionString().toString());
builder.append("shardName", _shardName);
BSONObjBuilder versionB(builder.subobjStart("versions"));
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 2a9d835027f..ff6248783f3 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -72,7 +72,7 @@ public:
bool enabled();
- std::string getConfigServer();
+ std::string getConfigServer(OperationContext* txn);
std::string getShardName();
// Initialize sharding state and begin authenticating outgoing connections and handling
@@ -149,7 +149,7 @@ public:
const std::string& ns,
ChunkVersion* latestShardVersion);
- void appendInfo(BSONObjBuilder& b);
+ void appendInfo(OperationContext* txn, BSONObjBuilder& b);
// querying support
diff --git a/src/mongo/db/storage/record_store_test_harness.h b/src/mongo/db/storage/record_store_test_harness.h
index 69d7640047f..cc613c3bd3b 100644
--- a/src/mongo/db/storage/record_store_test_harness.h
+++ b/src/mongo/db/storage/record_store_test_harness.h
@@ -57,7 +57,7 @@ public:
}
std::unique_ptr<OperationContext> newOperationContext() {
- return newOperationContext(client());
+ return newOperationContext(nullptr);
}
/**
diff --git a/src/mongo/db/storage/record_store_test_touch.cpp b/src/mongo/db/storage/record_store_test_touch.cpp
index c765bc2638b..14181ec0c9b 100644
--- a/src/mongo/db/storage/record_store_test_touch.cpp
+++ b/src/mongo/db/storage/record_store_test_touch.cpp
@@ -51,7 +51,8 @@ TEST(RecordStoreTestHarness, TouchEmpty) {
}
{
- unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext());
+ unique_ptr<OperationContext> opCtx(
+ harnessHelper->newOperationContext(harnessHelper->client()));
{
BSONObjBuilder stats;
Status status = rs->touch(opCtx.get(), &stats);
@@ -93,7 +94,8 @@ TEST(RecordStoreTestHarness, TouchNonEmpty) {
}
{
- unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext());
+ unique_ptr<OperationContext> opCtx(
+ harnessHelper->newOperationContext(harnessHelper->client()));
{
BSONObjBuilder stats;
// XXX does not verify the collection was loaded into cache
@@ -116,7 +118,8 @@ TEST(RecordStoreTestHarness, TouchEmptyWithNullStats) {
}
{
- unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext());
+ unique_ptr<OperationContext> opCtx(
+ harnessHelper->newOperationContext(harnessHelper->client()));
Status status = rs->touch(opCtx.get(), NULL /* stats output */);
ASSERT(status.isOK() || status.code() == ErrorCodes::CommandNotSupported);
}
@@ -155,7 +158,8 @@ TEST(RecordStoreTestHarness, TouchNonEmptyWithNullStats) {
}
{
- unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext());
+ unique_ptr<OperationContext> opCtx(
+ harnessHelper->newOperationContext(harnessHelper->client()));
// XXX does not verify the collection was loaded into cache
// (even if supported by storage engine)
Status status = rs->touch(opCtx.get(), NULL /* stats output */);
diff --git a/src/mongo/dbtests/chunk_manager_tests.cpp b/src/mongo/dbtests/chunk_manager_tests.cpp
index 9d64c226df2..9a04ff1bb3a 100644
--- a/src/mongo/dbtests/chunk_manager_tests.cpp
+++ b/src/mongo/dbtests/chunk_manager_tests.cpp
@@ -111,7 +111,7 @@ protected:
ShardKeyPattern shardKeyPattern(BSON(keyName << 1));
ChunkManager manager(_collName, shardKeyPattern, false);
- manager.createFirstChunks(_shardId, &splitKeys, NULL);
+ manager.createFirstChunks(&_txn, _shardId, &splitKeys, NULL);
}
};
@@ -181,7 +181,7 @@ TEST_F(ChunkManagerTests, Basic) {
collType.setDropped(false);
ChunkManager manager(collType);
- manager.loadExistingRanges(nullptr);
+ manager.loadExistingRanges(&_txn, nullptr);
ASSERT(manager.getVersion().epoch() == version.epoch());
ASSERT(manager.getVersion().minorVersion() == (numChunks - 1));
@@ -196,7 +196,7 @@ TEST_F(ChunkManagerTests, Basic) {
// Make new manager load chunk diff
ChunkManager newManager(manager.getns(), manager.getShardKeyPattern(), manager.isUnique());
- newManager.loadExistingRanges(&manager);
+ newManager.loadExistingRanges(&_txn, &manager);
ASSERT(newManager.getVersion().toLong() == laterVersion.toLong());
ASSERT(newManager.getVersion().epoch() == laterVersion.epoch());
diff --git a/src/mongo/dbtests/config_upgrade_tests.cpp b/src/mongo/dbtests/config_upgrade_tests.cpp
index 7e1eb958c46..a108ebcd524 100644
--- a/src/mongo/dbtests/config_upgrade_tests.cpp
+++ b/src/mongo/dbtests/config_upgrade_tests.cpp
@@ -165,7 +165,7 @@ TEST_F(ConfigUpgradeTests, EmptyVersion) {
// Zero version (no version doc)
VersionType oldVersion;
- Status status = getConfigVersion(grid.catalogManager(), &oldVersion);
+ Status status = getConfigVersion(grid.catalogManager(&_txn), &oldVersion);
ASSERT(status.isOK());
ASSERT_EQUALS(oldVersion.getMinCompatibleVersion(), 0);
@@ -185,7 +185,7 @@ TEST_F(ConfigUpgradeTests, ClusterIDVersion) {
newVersion.clear();
// Current Version w/o clusterId (invalid!)
- Status status = getConfigVersion(grid.catalogManager(), &newVersion);
+ Status status = getConfigVersion(grid.catalogManager(&_txn), &newVersion);
ASSERT(!status.isOK());
newVersion.clear();
@@ -201,7 +201,7 @@ TEST_F(ConfigUpgradeTests, ClusterIDVersion) {
newVersion.clear();
// Current version w/ clusterId (valid!)
- status = getConfigVersion(grid.catalogManager(), &newVersion);
+ status = getConfigVersion(grid.catalogManager(&_txn), &newVersion);
ASSERT(status.isOK());
ASSERT_EQUALS(newVersion.getMinCompatibleVersion(), MIN_COMPATIBLE_CONFIG_VERSION);
@@ -218,8 +218,8 @@ TEST_F(ConfigUpgradeTests, InitialUpgrade) {
VersionType versionOld;
VersionType version;
string errMsg;
- bool result =
- checkAndUpgradeConfigVersion(grid.catalogManager(), false, &versionOld, &version, &errMsg);
+ bool result = checkAndUpgradeConfigVersion(
+ grid.catalogManager(&_txn), false, &versionOld, &version, &errMsg);
ASSERT(result);
ASSERT_EQUALS(versionOld.getCurrentVersion(), 0);
@@ -241,8 +241,8 @@ TEST_F(ConfigUpgradeTests, BadVersionUpgrade) {
VersionType versionOld;
VersionType version;
string errMsg;
- bool result =
- checkAndUpgradeConfigVersion(grid.catalogManager(), false, &versionOld, &version, &errMsg);
+ bool result = checkAndUpgradeConfigVersion(
+ grid.catalogManager(&_txn), false, &versionOld, &version, &errMsg);
ASSERT(!result);
}
@@ -257,11 +257,11 @@ TEST_F(ConfigUpgradeTests, CheckMongoVersion) {
storeShardsAndPings(5, 10); // 5 shards, 10 pings
// Our version is >= 2.2, so this works
- Status status = checkClusterMongoVersions(grid.catalogManager(), "2.2");
+ Status status = checkClusterMongoVersions(grid.catalogManager(&_txn), "2.2");
ASSERT(status.isOK());
// Our version is < 9.9, so this doesn't work (until we hit v99.99)
- status = checkClusterMongoVersions(grid.catalogManager(), "99.99");
+ status = checkClusterMongoVersions(grid.catalogManager(&_txn), "99.99");
ASSERT(status.code() == ErrorCodes::RemoteValidationError);
}
diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp
index 6fd5d7dc923..9f181f49ac9 100644
--- a/src/mongo/dbtests/framework.cpp
+++ b/src/mongo/dbtests/framework.cpp
@@ -70,12 +70,16 @@ int runDbTests(int argc, char** argv) {
ShardingState::get(getGlobalServiceContext())->initialize("$dummy:10000");
// Note: ShardingState::initialize also initializes the distLockMgr.
- auto distLockMgr =
- dynamic_cast<LegacyDistLockManager*>(grid.catalogManager()->getDistLockManager());
- if (distLockMgr) {
- distLockMgr->enablePinger(false);
+ {
+ auto txn = cc().makeOperationContext();
+ auto distLockMgr = dynamic_cast<LegacyDistLockManager*>(
+ grid.catalogManager(txn.get())->getDistLockManager());
+ if (distLockMgr) {
+ distLockMgr->enablePinger(false);
+ }
}
+
int ret = unittest::Suite::run(frameworkGlobalParams.suites,
frameworkGlobalParams.filter,
frameworkGlobalParams.runsPerTest);
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 4e5f49f169c..676f1fe21ad 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -155,6 +155,7 @@ env.CppUnitTest(
'cluster_ops',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/range_arithmetic',
+ '$BUILD_DIR/mongo/db/service_context',
]
)
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp
index ac476a8c67e..fdd575c196c 100644
--- a/src/mongo/s/balance.cpp
+++ b/src/mongo/s/balance.cpp
@@ -80,7 +80,8 @@ Balancer::Balancer() : _balancedLastTime(0), _policy(new BalancerPolicy()) {}
Balancer::~Balancer() = default;
-int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks,
+int Balancer::_moveChunks(OperationContext* txn,
+ const vector<shared_ptr<MigrateInfo>>& candidateChunks,
const WriteConcernOptions* writeConcern,
bool waitForDelete) {
int movedCount = 0;
@@ -89,7 +90,7 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks
// If the balancer was disabled since we started this round, don't start new chunks
// moves.
const auto balSettingsResult =
- grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey);
+ grid.catalogManager(txn)->getGlobalSettings(SettingsType::BalancerDocKey);
const bool isBalSettingsAbsent =
balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument;
@@ -121,25 +122,25 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks
const NamespaceString nss(migrateInfo->ns);
try {
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
fassert(28628, status.getStatus());
shared_ptr<DBConfig> cfg = status.getValue();
// NOTE: We purposely do not reload metadata here, since _doBalanceRound already
// tried to do so once.
- shared_ptr<ChunkManager> cm = cfg->getChunkManager(migrateInfo->ns);
+ shared_ptr<ChunkManager> cm = cfg->getChunkManager(txn, migrateInfo->ns);
invariant(cm);
- ChunkPtr c = cm->findIntersectingChunk(migrateInfo->chunk.min);
+ ChunkPtr c = cm->findIntersectingChunk(txn, migrateInfo->chunk.min);
if (c->getMin().woCompare(migrateInfo->chunk.min) ||
c->getMax().woCompare(migrateInfo->chunk.max)) {
// Likely a split happened somewhere, so force reload the chunk manager
- cm = cfg->getChunkManager(migrateInfo->ns, true);
+ cm = cfg->getChunkManager(txn, migrateInfo->ns, true);
invariant(cm);
- c = cm->findIntersectingChunk(migrateInfo->chunk.min);
+ c = cm->findIntersectingChunk(txn, migrateInfo->chunk.min);
if (c->getMin().woCompare(migrateInfo->chunk.min) ||
c->getMax().woCompare(migrateInfo->chunk.max)) {
@@ -151,7 +152,8 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks
}
BSONObj res;
- if (c->moveAndCommit(migrateInfo->to,
+ if (c->moveAndCommit(txn,
+ migrateInfo->to,
Chunk::MaxChunkSize,
writeConcern,
waitForDelete,
@@ -167,20 +169,20 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks
if (res["chunkTooBig"].trueValue()) {
// Reload just to be safe
- cm = cfg->getChunkManager(migrateInfo->ns);
+ cm = cfg->getChunkManager(txn, migrateInfo->ns);
invariant(cm);
- c = cm->findIntersectingChunk(migrateInfo->chunk.min);
+ c = cm->findIntersectingChunk(txn, migrateInfo->chunk.min);
log() << "performing a split because migrate failed for size reasons";
- Status status = c->split(Chunk::normal, NULL, NULL);
+ Status status = c->split(txn, Chunk::normal, NULL, NULL);
log() << "split results: " << status;
if (!status.isOK()) {
log() << "marking chunk as jumbo: " << c->toString();
- c->markAsJumbo();
+ c->markAsJumbo(txn);
// We increment moveCount so we do another round right away
movedCount++;
@@ -195,7 +197,7 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks
return movedCount;
}
-void Balancer::_ping(bool waiting) {
+void Balancer::_ping(OperationContext* txn, bool waiting) {
MongosType mType;
mType.setName(_myid);
mType.setPing(jsTime());
@@ -203,12 +205,12 @@ void Balancer::_ping(bool waiting) {
mType.setWaiting(waiting);
mType.setMongoVersion(versionString);
- grid.catalogManager()->update(MongosType::ConfigNS,
- BSON(MongosType::name(_myid)),
- BSON("$set" << mType.toBSON()),
- true,
- false,
- NULL);
+ grid.catalogManager(txn)->update(MongosType::ConfigNS,
+ BSON(MongosType::name(_myid)),
+ BSON("$set" << mType.toBSON()),
+ true,
+ false,
+ NULL);
}
bool Balancer::_checkOIDs() {
@@ -282,11 +284,12 @@ void warnOnMultiVersion(const ShardInfoMap& shardInfo) {
}
}
-void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) {
+void Balancer::_doBalanceRound(OperationContext* txn,
+ vector<shared_ptr<MigrateInfo>>* candidateChunks) {
invariant(candidateChunks);
vector<CollectionType> collections;
- Status collsStatus = grid.catalogManager()->getCollections(nullptr, &collections);
+ Status collsStatus = grid.catalogManager(txn)->getCollections(nullptr, &collections);
if (!collsStatus.isOK()) {
warning() << "Failed to retrieve the set of collections during balancing round "
<< collsStatus;
@@ -304,7 +307,7 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks)
//
// TODO: skip unresponsive shards and mark information as stale.
ShardInfoMap shardInfo;
- Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo);
+ Status loadStatus = DistributionStatus::populateShardInfoMap(txn, &shardInfo);
if (!loadStatus.isOK()) {
warning() << "failed to load shard metadata" << causedBy(loadStatus);
return;
@@ -328,10 +331,10 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks)
}
std::vector<ChunkType> allNsChunks;
- grid.catalogManager()->getChunks(BSON(ChunkType::ns(nss.ns())),
- BSON(ChunkType::min() << 1),
- boost::none, // all chunks
- &allNsChunks);
+ grid.catalogManager(txn)->getChunks(BSON(ChunkType::ns(nss.ns())),
+ BSON(ChunkType::min() << 1),
+ boost::none, // all chunks
+ &allNsChunks);
set<BSONObj> allChunkMinimums;
map<string, vector<ChunkType>> shardToChunksMap;
@@ -362,7 +365,8 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks)
{
vector<TagsType> collectionTags;
- uassertStatusOK(grid.catalogManager()->getTagsForCollection(nss.ns(), &collectionTags));
+ uassertStatusOK(
+ grid.catalogManager(txn)->getTagsForCollection(nss.ns(), &collectionTags));
for (const auto& tt : collectionTags) {
ranges.push_back(
TagRange(tt.getMinKey().getOwned(), tt.getMaxKey().getOwned(), tt.getTag()));
@@ -372,7 +376,7 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks)
}
}
- auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto statusGetDb = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (!statusGetDb.isOK()) {
warning() << "could not load db config to balance collection [" << nss.ns()
<< "]: " << statusGetDb.getStatus();
@@ -383,7 +387,7 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks)
// This line reloads the chunk manager once if this process doesn't know the collection
// is sharded yet.
- shared_ptr<ChunkManager> cm = cfg->getChunkManagerIfExists(nss.ns(), true);
+ shared_ptr<ChunkManager> cm = cfg->getChunkManagerIfExists(txn, nss.ns(), true);
if (!cm) {
warning() << "could not load chunks to balance " << nss.ns() << " collection";
continue;
@@ -405,12 +409,12 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks)
log() << "nss: " << nss.ns() << " need to split on " << min
<< " because there is a range there";
- ChunkPtr c = cm->findIntersectingChunk(min);
+ ChunkPtr c = cm->findIntersectingChunk(txn, min);
vector<BSONObj> splitPoints;
splitPoints.push_back(min);
- Status status = c->multiSplit(splitPoints, NULL);
+ Status status = c->multiSplit(txn, splitPoints, NULL);
if (!status.isOK()) {
error() << "split failed: " << status;
} else {
@@ -479,6 +483,8 @@ void Balancer::run() {
const int sleepTime = 10;
while (!inShutdown()) {
+ auto txn = cc().makeOperationContext();
+
Timer balanceRoundTimer;
ActionLogType actionLog;
@@ -487,7 +493,7 @@ void Balancer::run() {
try {
// ping has to be first so we keep things in the config server in sync
- _ping();
+ _ping(txn.get());
BSONObj balancerResult;
@@ -495,10 +501,10 @@ void Balancer::run() {
Shard::reloadShardInfo();
// refresh chunk size (even though another balancer might be active)
- Chunk::refreshChunkSize();
+ Chunk::refreshChunkSize(txn.get());
auto balSettingsResult =
- grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey);
+ grid.catalogManager(txn.get())->getGlobalSettings(SettingsType::BalancerDocKey);
const bool isBalSettingsAbsent =
balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument;
if (!balSettingsResult.isOK() && !isBalSettingsAbsent) {
@@ -514,7 +520,7 @@ void Balancer::run() {
LOG(1) << "skipping balancing round because balancing is disabled";
// Ping again so scripts can determine if we're active without waiting
- _ping(true);
+ _ping(txn.get(), true);
sleepsecs(sleepTime);
continue;
@@ -523,14 +529,14 @@ void Balancer::run() {
uassert(13258, "oids broken after resetting!", _checkOIDs());
{
- auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(
+ auto scopedDistLock = grid.catalogManager(txn.get())->getDistLockManager()->lock(
"balancer", "doing balance round");
if (!scopedDistLock.isOK()) {
LOG(1) << "skipping balancing round" << causedBy(scopedDistLock.getStatus());
// Ping again so scripts can determine if we're active without waiting
- _ping(true);
+ _ping(txn.get(), true);
sleepsecs(sleepTime); // no need to wake up soon
continue;
@@ -550,14 +556,14 @@ void Balancer::run() {
<< (writeConcern.get() ? writeConcern->toBSON().toString() : "default");
vector<shared_ptr<MigrateInfo>> candidateChunks;
- _doBalanceRound(&candidateChunks);
+ _doBalanceRound(txn.get(), &candidateChunks);
if (candidateChunks.size() == 0) {
LOG(1) << "no need to move any chunk";
_balancedLastTime = 0;
} else {
_balancedLastTime =
- _moveChunks(candidateChunks, writeConcern.get(), waitForDelete);
+ _moveChunks(txn.get(), candidateChunks, writeConcern.get(), waitForDelete);
}
actionLog.setDetails(boost::none,
@@ -566,13 +572,13 @@ void Balancer::run() {
_balancedLastTime);
actionLog.setTime(jsTime());
- grid.catalogManager()->logAction(actionLog);
+ grid.catalogManager(txn.get())->logAction(actionLog);
LOG(1) << "*** end of balancing round";
}
// Ping again so scripts can determine if we're active without waiting
- _ping(true);
+ _ping(txn.get(), true);
sleepsecs(_balancedLastTime ? sleepTime / 10 : sleepTime);
} catch (std::exception& e) {
@@ -585,7 +591,7 @@ void Balancer::run() {
actionLog.setDetails(string(e.what()), balanceRoundTimer.millis(), 0, 0);
actionLog.setTime(jsTime());
- grid.catalogManager()->logAction(actionLog);
+ grid.catalogManager(txn.get())->logAction(actionLog);
// Sleep a fair amount before retrying because of the error
sleepsecs(sleepTime);
diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h
index d3dc9b39045..27b9d217640 100644
--- a/src/mongo/s/balance.h
+++ b/src/mongo/s/balance.h
@@ -37,6 +37,7 @@ namespace mongo {
class BalancerPolicy;
struct MigrateInfo;
+class OperationContext;
struct WriteConcernOptions;
/**
@@ -92,7 +93,8 @@ private:
* @param candidateChunks (IN/OUT) filled with candidate chunks, one per collection, that could
* possibly be moved
*/
- void _doBalanceRound(std::vector<std::shared_ptr<MigrateInfo>>* candidateChunks);
+ void _doBalanceRound(OperationContext* txn,
+ std::vector<std::shared_ptr<MigrateInfo>>* candidateChunks);
/**
* Issues chunk migration request, one at a time.
@@ -102,14 +104,15 @@ private:
* @param waitForDelete wait for deletes to complete after each chunk move
* @return number of chunks effectively moved
*/
- int _moveChunks(const std::vector<std::shared_ptr<MigrateInfo>>& candidateChunks,
+ int _moveChunks(OperationContext* txn,
+ const std::vector<std::shared_ptr<MigrateInfo>>& candidateChunks,
const WriteConcernOptions* writeConcern,
bool waitForDelete);
/**
* Marks this balancer as being live on the config server(s).
*/
- void _ping(bool waiting = false);
+ void _ping(OperationContext* txn, bool waiting = false);
/**
* @return true if all the servers listed in configdb as being shards are reachable and are
diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp
index 18228e092e6..55aa205dfd7 100644
--- a/src/mongo/s/balancer_policy.cpp
+++ b/src/mongo/s/balancer_policy.cpp
@@ -273,10 +273,10 @@ void DistributionStatus::dump() const {
}
}
-Status DistributionStatus::populateShardInfoMap(ShardInfoMap* shardInfo) {
+Status DistributionStatus::populateShardInfoMap(OperationContext* txn, ShardInfoMap* shardInfo) {
try {
vector<ShardType> shards;
- Status status = grid.catalogManager()->getAllShards(&shards);
+ Status status = grid.catalogManager(txn)->getAllShards(&shards);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/s/balancer_policy.h b/src/mongo/s/balancer_policy.h
index 16614f424f1..e9d5c8b1f1d 100644
--- a/src/mongo/s/balancer_policy.h
+++ b/src/mongo/s/balancer_policy.h
@@ -38,6 +38,7 @@
namespace mongo {
class ChunkManager;
+class OperationContext;
struct ChunkInfo {
const BSONObj min;
@@ -199,7 +200,7 @@ public:
* Retrieves shard metadata information from the config server as well as some stats
* from the shards.
*/
- static Status populateShardInfoMap(ShardInfoMap* shardInfo);
+ static Status populateShardInfoMap(OperationContext* txn, ShardInfoMap* shardInfo);
/**
* Note: jumbo and versions are not set.
diff --git a/src/mongo/s/catalog/catalog_cache.cpp b/src/mongo/s/catalog/catalog_cache.cpp
index 416a0a7368a..2ec63337bc9 100644
--- a/src/mongo/s/catalog/catalog_cache.cpp
+++ b/src/mongo/s/catalog/catalog_cache.cpp
@@ -46,7 +46,8 @@ CatalogCache::CatalogCache(CatalogManager* catalogManager) : _catalogManager(cat
invariant(_catalogManager);
}
-StatusWith<shared_ptr<DBConfig>> CatalogCache::getDatabase(const string& dbName) {
+StatusWith<shared_ptr<DBConfig>> CatalogCache::getDatabase(OperationContext* txn,
+ const string& dbName) {
stdx::lock_guard<stdx::mutex> guard(_mutex);
ShardedDatabasesMap::iterator it = _databases.find(dbName);
@@ -61,7 +62,7 @@ StatusWith<shared_ptr<DBConfig>> CatalogCache::getDatabase(const string& dbName)
}
shared_ptr<DBConfig> db = std::make_shared<DBConfig>(dbName, status.getValue());
- db->load();
+ db->load(txn);
invariant(_databases.insert(std::make_pair(dbName, db)).second);
diff --git a/src/mongo/s/catalog/catalog_cache.h b/src/mongo/s/catalog/catalog_cache.h
index 2547da29960..85f5a87a494 100644
--- a/src/mongo/s/catalog/catalog_cache.h
+++ b/src/mongo/s/catalog/catalog_cache.h
@@ -39,6 +39,7 @@ namespace mongo {
class CatalogManager;
class DBConfig;
+class OperationContext;
template <typename T>
class StatusWith;
@@ -63,7 +64,8 @@ public:
* @param dbname The name of the database (must not contain dots, etc).
* @return The database if it exists, NULL otherwise.
*/
- StatusWith<std::shared_ptr<DBConfig>> getDatabase(const std::string& dbName);
+ StatusWith<std::shared_ptr<DBConfig>> getDatabase(OperationContext* txn,
+ const std::string& dbName);
/**
* Removes the database information for the specified name from the cache, so that the
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index 4c9bac8c522..98a02836ed7 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -332,13 +332,13 @@ Status CatalogManagerLegacy::shardCollection(OperationContext* txn,
txn->getClient()->clientAddress(true), "shardCollection.start", ns, collectionDetail.obj());
shared_ptr<ChunkManager> manager(new ChunkManager(ns, fieldsAndOrder, unique));
- manager->createFirstChunks(dbPrimaryShardId, &initPoints, &initShardIds);
- manager->loadExistingRanges(nullptr);
+ manager->createFirstChunks(txn, dbPrimaryShardId, &initPoints, &initShardIds);
+ manager->loadExistingRanges(txn, nullptr);
CollectionInfo collInfo;
collInfo.useChunkManager(manager);
- collInfo.save(ns);
- manager->reload(true);
+ collInfo.save(txn, ns);
+ manager->reload(txn, true);
// Tell the primary mongod to refresh its data
// TODO: Think the real fix here is for mongos to just
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 6b14ca34876..1d45a4988cb 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -209,13 +209,13 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn,
}
shared_ptr<ChunkManager> manager(new ChunkManager(ns, fieldsAndOrder, unique));
- manager->createFirstChunks(dbPrimaryShardId, &initPoints, &initShardIds);
- manager->loadExistingRanges(nullptr);
+ manager->createFirstChunks(txn, dbPrimaryShardId, &initPoints, &initShardIds);
+ manager->loadExistingRanges(txn, nullptr);
CollectionInfo collInfo;
collInfo.useChunkManager(manager);
- collInfo.save(ns);
- manager->reload(true);
+ collInfo.save(txn, ns);
+ manager->reload(txn, true);
// Tell the primary mongod to refresh its data
// TODO: Think the real fix here is for mongos to just
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp
index 477006aaa2a..6f0a0a6e818 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp
@@ -130,7 +130,7 @@ void CatalogManagerReplSetTestFixture::shutdownExecutor() {
}
CatalogManagerReplicaSet* CatalogManagerReplSetTestFixture::catalogManager() const {
- auto cm = dynamic_cast<CatalogManagerReplicaSet*>(grid.catalogManager());
+ auto cm = dynamic_cast<CatalogManagerReplicaSet*>(grid.catalogManager(_opCtx.get()));
invariant(cm);
return cm;
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 5263e2cbfb7..41923e3a994 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -72,12 +72,14 @@ const int kTooManySplitPoints = 4;
*
* Returns true if the chunk was actually moved.
*/
-bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) {
+bool tryMoveToOtherShard(OperationContext* txn,
+ const ChunkManager& manager,
+ const ChunkType& chunk) {
// reload sharding metadata before starting migration
- ChunkManagerPtr chunkMgr = manager.reload(false /* just reloaded in mulitsplit */);
+ ChunkManagerPtr chunkMgr = manager.reload(txn, false /* just reloaded in mulitsplit */);
ShardInfoMap shardInfo;
- Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo);
+ Status loadStatus = DistributionStatus::populateShardInfoMap(txn, &shardInfo);
if (!loadStatus.isOK()) {
warning() << "failed to load shard metadata while trying to moveChunk after "
@@ -93,7 +95,7 @@ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) {
map<string, vector<ChunkType>> shardToChunkMap;
DistributionStatus::populateShardToChunksMap(shardInfo, *chunkMgr, &shardToChunkMap);
- StatusWith<string> tagStatus = grid.catalogManager()->getTagForChunk(manager.getns(), chunk);
+ StatusWith<string> tagStatus = grid.catalogManager(txn)->getTagForChunk(manager.getns(), chunk);
if (!tagStatus.isOK()) {
warning() << "Not auto-moving chunk because of an error encountered while "
<< "checking tag for chunk: " << tagStatus.getStatus();
@@ -114,7 +116,7 @@ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) {
return false;
}
- ChunkPtr toMove = chunkMgr->findIntersectingChunk(chunk.getMin());
+ ChunkPtr toMove = chunkMgr->findIntersectingChunk(txn, chunk.getMin());
if (!(toMove->getMin() == chunk.getMin() && toMove->getMax() == chunk.getMax())) {
LOG(1) << "recently split chunk: " << chunk << " modified before we could migrate "
@@ -132,7 +134,8 @@ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) {
BSONObj res;
WriteConcernOptions noThrottle;
- if (!toMove->moveAndCommit(newShard->getId(),
+ if (!toMove->moveAndCommit(txn,
+ newShard->getId(),
Chunk::MaxChunkSize,
&noThrottle, /* secondaryThrottle */
false, /* waitForDelete - small chunk, no need */
@@ -142,7 +145,7 @@ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) {
}
// update our config
- manager.reload();
+ manager.reload(txn);
return true;
}
@@ -357,7 +360,10 @@ void Chunk::determineSplitPoints(bool atMedian, vector<BSONObj>* splitPoints) co
}
}
-Status Chunk::split(SplitPointMode mode, size_t* resultingSplits, BSONObj* res) const {
+Status Chunk::split(OperationContext* txn,
+ SplitPointMode mode,
+ size_t* resultingSplits,
+ BSONObj* res) const {
size_t dummy;
if (resultingSplits == NULL) {
resultingSplits = &dummy;
@@ -416,12 +422,12 @@ Status Chunk::split(SplitPointMode mode, size_t* resultingSplits, BSONObj* res)
return Status(ErrorCodes::CannotSplit, msg);
}
- Status status = multiSplit(splitPoints, res);
+ Status status = multiSplit(txn, splitPoints, res);
*resultingSplits = splitPoints.size();
return status;
}
-Status Chunk::multiSplit(const vector<BSONObj>& m, BSONObj* res) const {
+Status Chunk::multiSplit(OperationContext* txn, const vector<BSONObj>& m, BSONObj* res) const {
const size_t maxSplitPoints = 8192;
uassert(10165, "can't split as shard doesn't have a manager", _manager);
@@ -438,7 +444,7 @@ Status Chunk::multiSplit(const vector<BSONObj>& m, BSONObj* res) const {
cmd.append("max", getMax());
cmd.append("from", getShardId());
cmd.append("splitKeys", m);
- cmd.append("configdb", grid.catalogManager()->connectionString().toString());
+ cmd.append("configdb", grid.catalogManager(txn)->connectionString().toString());
cmd.append("epoch", _manager->getVersion().epoch());
BSONObj cmdObj = cmd.obj();
@@ -458,12 +464,13 @@ Status Chunk::multiSplit(const vector<BSONObj>& m, BSONObj* res) const {
conn.done();
// force reload of config
- _manager->reload();
+ _manager->reload(txn);
return Status::OK();
}
-bool Chunk::moveAndCommit(const ShardId& toShardId,
+bool Chunk::moveAndCommit(OperationContext* txn,
+ const ShardId& toShardId,
long long chunkSize /* bytes */,
const WriteConcernOptions* writeConcern,
bool waitForDelete,
@@ -490,7 +497,7 @@ bool Chunk::moveAndCommit(const ShardId& toShardId,
builder.append("min", _min);
builder.append("max", _max);
builder.append("maxChunkSizeBytes", chunkSize);
- builder.append("configdb", grid.catalogManager()->connectionString().toString());
+ builder.append("configdb", grid.catalogManager(txn)->connectionString().toString());
// For legacy secondary throttle setting.
bool secondaryThrottle = true;
@@ -517,12 +524,12 @@ bool Chunk::moveAndCommit(const ShardId& toShardId,
// if succeeded, needs to reload to pick up the new location
// if failed, mongos may be stale
// reload is excessive here as the failure could be simply because collection metadata is taken
- _manager->reload();
+ _manager->reload(txn);
return worked;
}
-bool Chunk::splitIfShould(long dataWritten) const {
+bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) const {
dassert(ShouldAutoSplit);
LastError::Disabled d(&LastError::get(cc()));
@@ -555,7 +562,7 @@ bool Chunk::splitIfShould(long dataWritten) const {
BSONObj res;
size_t splitCount = 0;
- Status status = split(Chunk::autoSplitInternal, &splitCount, &res);
+ Status status = split(txn, Chunk::autoSplitInternal, &splitCount, &res);
if (!status.isOK()) {
// Split would have issued a message if we got here. This means there wasn't enough
// data to split, so don't want to try again until considerable more data
@@ -571,9 +578,9 @@ bool Chunk::splitIfShould(long dataWritten) const {
_dataWritten = 0;
}
- bool shouldBalance = grid.getConfigShouldBalance();
+ bool shouldBalance = grid.getConfigShouldBalance(txn);
if (shouldBalance) {
- auto status = grid.catalogManager()->getCollection(_manager->getns());
+ auto status = grid.catalogManager(txn)->getCollection(_manager->getns());
if (!status.isOK()) {
log() << "Auto-split for " << _manager->getns()
<< " failed to load collection metadata due to " << status.getStatus();
@@ -603,7 +610,7 @@ bool Chunk::splitIfShould(long dataWritten) const {
chunkToMove.setMin(range["min"].embeddedObject());
chunkToMove.setMax(range["max"].embeddedObject());
- tryMoveToOtherShard(*_manager, chunkToMove);
+ tryMoveToOtherShard(txn, *_manager, chunkToMove);
}
return true;
@@ -694,26 +701,26 @@ string Chunk::toString() const {
return ss.str();
}
-void Chunk::markAsJumbo() const {
+void Chunk::markAsJumbo(OperationContext* txn) const {
// set this first
// even if we can't set it in the db
// at least this mongos won't try and keep moving
_jumbo = true;
- Status result = grid.catalogManager()->update(ChunkType::ConfigNS,
- BSON(ChunkType::name(genID())),
- BSON("$set" << BSON(ChunkType::jumbo(true))),
- false, // upsert
- false, // multi
- NULL);
+ Status result = grid.catalogManager(txn)->update(ChunkType::ConfigNS,
+ BSON(ChunkType::name(genID())),
+ BSON("$set" << BSON(ChunkType::jumbo(true))),
+ false, // upsert
+ false, // multi
+ NULL);
if (!result.isOK()) {
warning() << "couldn't set jumbo for chunk: " << genID() << result.reason();
}
}
-void Chunk::refreshChunkSize() {
+void Chunk::refreshChunkSize(OperationContext* txn) {
auto chunkSizeSettingsResult =
- grid.catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey);
+ grid.catalogManager(txn)->getGlobalSettings(SettingsType::ChunkSizeDocKey);
if (!chunkSizeSettingsResult.isOK()) {
log() << chunkSizeSettingsResult.getStatus();
return;
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index d9872e77d47..1ec9a9447cb 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -34,6 +34,7 @@
namespace mongo {
class ChunkManager;
+class OperationContext;
struct WriteConcernOptions;
/**
@@ -126,7 +127,7 @@ public:
* then we check the real size, and if its too big, we split
* @return if something was split
*/
- bool splitIfShould(long dataWritten) const;
+ bool splitIfShould(OperationContext* txn, long dataWritten) const;
/**
* Splits this chunk at a non-specificed split key to be chosen by the
@@ -138,7 +139,10 @@ public:
*
* @throws UserException
*/
- Status split(SplitPointMode mode, size_t* resultingSplits, BSONObj* res) const;
+ Status split(OperationContext* txn,
+ SplitPointMode mode,
+ size_t* resultingSplits,
+ BSONObj* res) const;
/**
* Splits this chunk at the given key (or keys)
@@ -148,7 +152,9 @@ public:
*
* @throws UserException
*/
- Status multiSplit(const std::vector<BSONObj>& splitPoints, BSONObj* res) const;
+ Status multiSplit(OperationContext* txn,
+ const std::vector<BSONObj>& splitPoints,
+ BSONObj* res) const;
/**
* Asks the mongod holding this chunk to find a key that approximately divides this chunk in two
@@ -184,7 +190,8 @@ public:
* @param res the object containing details about the migrate execution
* @return true if move was successful
*/
- bool moveAndCommit(const ShardId& to,
+ bool moveAndCommit(OperationContext* txn,
+ const ShardId& to,
long long chunkSize,
const WriteConcernOptions* writeConcern,
bool waitForDelete,
@@ -201,7 +208,7 @@ public:
* marks this chunk as a jumbo chunk
* that means the chunk will be inelligble for migrates
*/
- void markAsJumbo() const;
+ void markAsJumbo(OperationContext* txn) const;
bool isJumbo() const {
return _jumbo;
@@ -210,7 +217,7 @@ public:
/**
* Attempt to refresh maximum chunk size from config.
*/
- static void refreshChunkSize();
+ static void refreshChunkSize(OperationContext* txn);
/**
* sets MaxChunkSize
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index b200ce4e482..638935ddbf5 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -172,7 +172,7 @@ ChunkManager::ChunkManager(const CollectionType& coll)
_version = ChunkVersion::fromBSON(coll.toBSON());
}
-void ChunkManager::loadExistingRanges(const ChunkManager* oldManager) {
+void ChunkManager::loadExistingRanges(OperationContext* txn, const ChunkManager* oldManager) {
int tries = 3;
while (tries--) {
@@ -182,7 +182,7 @@ void ChunkManager::loadExistingRanges(const ChunkManager* oldManager) {
Timer t;
- bool success = _load(chunkMap, shardIds, &shardVersions, oldManager);
+ bool success = _load(txn, chunkMap, shardIds, &shardVersions, oldManager);
if (success) {
log() << "ChunkManager: time to load chunks for " << _ns << ": " << t.millis() << "ms"
<< " sequenceNumber: " << _sequenceNumber << " version: " << _version.toString()
@@ -215,7 +215,8 @@ void ChunkManager::loadExistingRanges(const ChunkManager* oldManager) {
<< " after 3 attempts. Please try again.");
}
-bool ChunkManager::_load(ChunkMap& chunkMap,
+bool ChunkManager::_load(OperationContext* txn,
+ ChunkMap& chunkMap,
set<ShardId>& shardIds,
ShardVersionMap* shardVersions,
const ChunkManager* oldManager) {
@@ -262,7 +263,7 @@ bool ChunkManager::_load(ChunkMap& chunkMap,
std::vector<ChunkType> chunks;
uassertStatusOK(
- grid.catalogManager()->getChunks(diffQuery.query, diffQuery.sort, boost::none, &chunks));
+ grid.catalogManager(txn)->getChunks(diffQuery.query, diffQuery.sort, boost::none, &chunks));
int diffsApplied = differ.calculateConfigDiff(chunks);
if (diffsApplied > 0) {
@@ -317,12 +318,12 @@ bool ChunkManager::_load(ChunkMap& chunkMap,
}
}
-shared_ptr<ChunkManager> ChunkManager::reload(bool force) const {
+shared_ptr<ChunkManager> ChunkManager::reload(OperationContext* txn, bool force) const {
const NamespaceString nss(_ns);
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
shared_ptr<DBConfig> config = uassertStatusOK(status);
- return config->getChunkManager(getns(), force);
+ return config->getChunkManager(txn, getns(), force);
}
void ChunkManager::_printChunks() const {
@@ -385,7 +386,8 @@ void ChunkManager::calcInitSplitsAndShards(const ShardId& primaryShardId,
}
}
-void ChunkManager::createFirstChunks(const ShardId& primaryShardId,
+void ChunkManager::createFirstChunks(OperationContext* txn,
+ const ShardId& primaryShardId,
const vector<BSONObj>* initPoints,
const set<ShardId>* initShardIds) {
// TODO distlock?
@@ -417,7 +419,7 @@ void ChunkManager::createFirstChunks(const ShardId& primaryShardId,
BSONObj chunkObj = chunkBuilder.obj();
- Status result = grid.catalogManager()->update(
+ Status result = grid.catalogManager(txn)->update(
ChunkType::ConfigNS, BSON(ChunkType::name(temp.genID())), chunkObj, true, false, NULL);
version.incMinor();
@@ -433,7 +435,7 @@ void ChunkManager::createFirstChunks(const ShardId& primaryShardId,
_version = ChunkVersion(0, 0, version.epoch());
}
-ChunkPtr ChunkManager::findIntersectingChunk(const BSONObj& shardKey) const {
+ChunkPtr ChunkManager::findIntersectingChunk(OperationContext* txn, const BSONObj& shardKey) const {
{
BSONObj chunkMin;
ChunkPtr chunk;
@@ -454,7 +456,7 @@ ChunkPtr ChunkManager::findIntersectingChunk(const BSONObj& shardKey) const {
log() << *chunk;
log() << shardKey;
- reload();
+ reload(txn);
msgasserted(13141, "Chunk map pointed to incorrect chunk");
}
}
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index cb68b42b858..0ca94237920 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -42,6 +42,7 @@ class CanonicalQuery;
class ChunkManager;
class CollectionType;
struct QuerySolutionNode;
+class OperationContext;
typedef std::shared_ptr<ChunkManager> ChunkManagerPtr;
@@ -159,12 +160,13 @@ public:
//
// Creates new chunks based on info in chunk manager
- void createFirstChunks(const ShardId& primaryShardId,
+ void createFirstChunks(OperationContext* txn,
+ const ShardId& primaryShardId,
const std::vector<BSONObj>* initPoints,
const std::set<ShardId>* initShardIds);
// Loads existing ranges based on info in chunk manager
- void loadExistingRanges(const ChunkManager* oldManager);
+ void loadExistingRanges(OperationContext* txn, const ChunkManager* oldManager);
// Helpers for load
@@ -190,7 +192,7 @@ public:
* when the shard key is {a : "hashed"}, you can call
* findIntersectingChunk() on {a : hash("foo") }
*/
- ChunkPtr findIntersectingChunk(const BSONObj& shardKey) const;
+ ChunkPtr findIntersectingChunk(OperationContext* txn, const BSONObj& shardKey) const;
void getShardIdsForQuery(std::set<ShardId>& shardIds, const BSONObj& query) const;
void getAllShardIds(std::set<ShardId>* all) const;
@@ -238,11 +240,13 @@ public:
int getCurrentDesiredChunkSize() const;
- std::shared_ptr<ChunkManager> reload(bool force = true) const; // doesn't modify self!
+ std::shared_ptr<ChunkManager> reload(OperationContext* txn,
+ bool force = true) const; // doesn't modify self!
private:
// returns true if load was consistent
- bool _load(ChunkMap& chunks,
+ bool _load(OperationContext* txn,
+ ChunkMap& chunks,
std::set<ShardId>& shardIds,
ShardVersionMap* shardVersions,
const ChunkManager* oldManager);
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index 446d4a5fafd..ee98e9a8481 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -266,8 +266,8 @@ bool wasMetadataRefreshed(const ChunkManagerPtr& managerA,
ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss)
: _nss(nss), _needsTargetingRefresh(false) {}
-Status ChunkManagerTargeter::init() {
- auto status = grid.implicitCreateDb(_nss.db().toString());
+Status ChunkManagerTargeter::init(OperationContext* txn) {
+ auto status = grid.implicitCreateDb(txn, _nss.db().toString());
if (!status.isOK()) {
return status.getStatus();
}
@@ -282,7 +282,9 @@ const NamespaceString& ChunkManagerTargeter::getNS() const {
return _nss;
}
-Status ChunkManagerTargeter::targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const {
+Status ChunkManagerTargeter::targetInsert(OperationContext* txn,
+ const BSONObj& doc,
+ ShardEndpoint** endpoint) const {
BSONObj shardKey;
if (_manager) {
@@ -310,7 +312,7 @@ Status ChunkManagerTargeter::targetInsert(const BSONObj& doc, ShardEndpoint** en
// Target the shard key or database primary
if (!shardKey.isEmpty()) {
- return targetShardKey(shardKey, doc.objsize(), endpoint);
+ return targetShardKey(txn, shardKey, doc.objsize(), endpoint);
} else {
if (!_primary) {
return Status(ErrorCodes::NamespaceNotFound,
@@ -323,7 +325,8 @@ Status ChunkManagerTargeter::targetInsert(const BSONObj& doc, ShardEndpoint** en
}
}
-Status ChunkManagerTargeter::targetUpdate(const BatchedUpdateDocument& updateDoc,
+Status ChunkManagerTargeter::targetUpdate(OperationContext* txn,
+ const BatchedUpdateDocument& updateDoc,
vector<ShardEndpoint*>* endpoints) const {
//
// Update targeting may use either the query or the update. This is to support save-style
@@ -411,7 +414,7 @@ Status ChunkManagerTargeter::targetUpdate(const BatchedUpdateDocument& updateDoc
// We can't rely on our query targeting to be exact
ShardEndpoint* endpoint = NULL;
Status result =
- targetShardKey(shardKey, (query.objsize() + updateExpr.objsize()), &endpoint);
+ targetShardKey(txn, shardKey, (query.objsize() + updateExpr.objsize()), &endpoint);
endpoints->push_back(endpoint);
return result;
} else if (updateType == UpdateType_OpStyle) {
@@ -421,7 +424,8 @@ Status ChunkManagerTargeter::targetUpdate(const BatchedUpdateDocument& updateDoc
}
}
-Status ChunkManagerTargeter::targetDelete(const BatchedDeleteDocument& deleteDoc,
+Status ChunkManagerTargeter::targetDelete(OperationContext* txn,
+ const BatchedDeleteDocument& deleteDoc,
vector<ShardEndpoint*>* endpoints) const {
BSONObj shardKey;
@@ -456,7 +460,7 @@ Status ChunkManagerTargeter::targetDelete(const BatchedDeleteDocument& deleteDoc
if (!shardKey.isEmpty()) {
// We can't rely on our query targeting to be exact
ShardEndpoint* endpoint = NULL;
- Status result = targetShardKey(shardKey, 0, &endpoint);
+ Status result = targetShardKey(txn, shardKey, 0, &endpoint);
endpoints->push_back(endpoint);
return result;
} else {
@@ -498,12 +502,13 @@ Status ChunkManagerTargeter::targetQuery(const BSONObj& query,
return Status::OK();
}
-Status ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey,
+Status ChunkManagerTargeter::targetShardKey(OperationContext* txn,
+ const BSONObj& shardKey,
long long estDataSize,
ShardEndpoint** endpoint) const {
invariant(NULL != _manager);
- ChunkPtr chunk = _manager->findIntersectingChunk(shardKey);
+ ChunkPtr chunk = _manager->findIntersectingChunk(txn, shardKey);
// Track autosplit stats for sharded collections
// Note: this is only best effort accounting and is not accurate.
@@ -597,7 +602,7 @@ const TargeterStats* ChunkManagerTargeter::getStats() const {
return &_stats;
}
-Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) {
+Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* txn, bool* wasChanged) {
bool dummy;
if (!wasChanged) {
wasChanged = &dummy;
@@ -620,7 +625,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) {
ChunkManagerPtr lastManager = _manager;
ShardPtr lastPrimary = _primary;
- auto status = grid.implicitCreateDb(_nss.db().toString());
+ auto status = grid.implicitCreateDb(txn, _nss.db().toString());
if (!status.isOK()) {
return status.getStatus();
}
@@ -649,7 +654,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) {
// If didn't already refresh the targeting information, refresh it
if (!alreadyRefreshed) {
// To match previous behavior, we just need an incremental refresh here
- return refreshNow(RefreshType_RefreshChunkManager);
+ return refreshNow(txn, RefreshType_RefreshChunkManager);
}
*wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary);
@@ -665,10 +670,10 @@ Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) {
if (result == CompareResult_Unknown) {
// Our current shard versions aren't all comparable to the old versions, maybe drop
- return refreshNow(RefreshType_ReloadDatabase);
+ return refreshNow(txn, RefreshType_ReloadDatabase);
} else if (result == CompareResult_LT) {
// Our current shard versions are less than the remote versions, but no drop
- return refreshNow(RefreshType_RefreshChunkManager);
+ return refreshNow(txn, RefreshType_RefreshChunkManager);
}
*wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary);
@@ -680,8 +685,8 @@ Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) {
return Status::OK();
}
-Status ChunkManagerTargeter::refreshNow(RefreshType refreshType) {
- auto status = grid.implicitCreateDb(_nss.db().toString());
+Status ChunkManagerTargeter::refreshNow(OperationContext* txn, RefreshType refreshType) {
+ auto status = grid.implicitCreateDb(txn, _nss.db().toString());
if (!status.isOK()) {
return status.getStatus();
}
@@ -696,7 +701,7 @@ Status ChunkManagerTargeter::refreshNow(RefreshType refreshType) {
try {
// Forces a remote check of the collection info, synchronization between threads
// happens internally.
- config->getChunkManagerIfExists(_nss.ns(), true);
+ config->getChunkManagerIfExists(txn, _nss.ns(), true);
} catch (const DBException& ex) {
return Status(ErrorCodes::UnknownError, ex.toString());
}
@@ -705,8 +710,8 @@ Status ChunkManagerTargeter::refreshNow(RefreshType refreshType) {
try {
// Dumps the db info, reloads it all, synchronization between threads happens
// internally.
- config->reload();
- config->getChunkManagerIfExists(_nss.ns(), true, true);
+ config->reload(txn);
+ config->getChunkManagerIfExists(txn, _nss.ns(), true, true);
} catch (const DBException& ex) {
return Status(ErrorCodes::UnknownError, ex.toString());
}
diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h
index e7c754448c8..e38b9b1742f 100644
--- a/src/mongo/s/chunk_manager_targeter.h
+++ b/src/mongo/s/chunk_manager_targeter.h
@@ -38,6 +38,7 @@ namespace mongo {
class ChunkManager;
struct ChunkVersion;
+class OperationContext;
class Shard;
struct TargeterStats {
@@ -62,19 +63,21 @@ public:
*
* Returns !OK if the information could not be initialized.
*/
- Status init();
+ Status init(OperationContext* txn);
const NamespaceString& getNS() const;
// Returns ShardKeyNotFound if document does not have a full shard key.
- Status targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const;
+ Status targetInsert(OperationContext* txn, const BSONObj& doc, ShardEndpoint** endpoint) const;
// Returns ShardKeyNotFound if the update can't be targeted without a shard key.
- Status targetUpdate(const BatchedUpdateDocument& updateDoc,
+ Status targetUpdate(OperationContext* txn,
+ const BatchedUpdateDocument& updateDoc,
std::vector<ShardEndpoint*>* endpoints) const;
// Returns ShardKeyNotFound if the delete can't be targeted without a shard key.
- Status targetDelete(const BatchedDeleteDocument& deleteDoc,
+ Status targetDelete(OperationContext* txn,
+ const BatchedDeleteDocument& deleteDoc,
std::vector<ShardEndpoint*>* endpoints) const;
Status targetCollection(std::vector<ShardEndpoint*>* endpoints) const;
@@ -94,7 +97,7 @@ public:
*
* Also see NSTargeter::refreshIfNeeded().
*/
- Status refreshIfNeeded(bool* wasChanged);
+ Status refreshIfNeeded(OperationContext* txn, bool* wasChanged);
/**
* Returns the stats. Note that the returned stats object is still owned by this targeter.
@@ -118,7 +121,7 @@ private:
/**
* Performs an actual refresh from the config server.
*/
- Status refreshNow(RefreshType refreshType);
+ Status refreshNow(OperationContext* txn, RefreshType refreshType);
/**
* Returns a vector of ShardEndpoints where a document might need to be placed.
@@ -140,7 +143,8 @@ private:
* Also has the side effect of updating the chunks stats with an estimate of the amount of
* data targeted at this shard key.
*/
- Status targetShardKey(const BSONObj& doc,
+ Status targetShardKey(OperationContext* txn,
+ const BSONObj& doc,
long long estDataSize,
ShardEndpoint** endpoint) const;
diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp
index c6d765afb5a..3850e6d747f 100644
--- a/src/mongo/s/client/shard_connection.cpp
+++ b/src/mongo/s/client/shard_connection.cpp
@@ -278,7 +278,7 @@ public:
}
}
- void checkVersions(const string& ns) {
+ void checkVersions(OperationContext* txn, const string& ns) {
vector<ShardId> all;
grid.shardRegistry()->getAllShardIds(&all);
@@ -301,7 +301,7 @@ public:
s->created++; // After, so failed creation doesn't get counted
}
- versionManager.checkShardVersionCB(s->avail, ns, false, 1);
+ versionManager.checkShardVersionCB(txn, s->avail, ns, false, 1);
} catch (const DBException& ex) {
warning() << "problem while initially checking shard versions on"
<< " " << shardId << causedBy(ex);
@@ -450,7 +450,10 @@ void ShardConnection::_finishInit() {
_finishedInit = true;
if (versionManager.isVersionableCB(_conn)) {
- _setVersion = versionManager.checkShardVersionCB(this, false, 1);
+ auto& client = cc();
+ auto txn = client.getOperationContext();
+ invariant(txn);
+ _setVersion = versionManager.checkShardVersionCB(txn, this, false, 1);
} else {
// Make sure we didn't specify a manager for a non-versionable connection (i.e. config)
verify(!_manager);
@@ -488,8 +491,8 @@ void ShardConnection::sync() {
ClientConnections::threadInstance()->sync();
}
-void ShardConnection::checkMyConnectionVersions(const string& ns) {
- ClientConnections::threadInstance()->checkVersions(ns);
+void ShardConnection::checkMyConnectionVersions(OperationContext* txn, const string& ns) {
+ ClientConnections::threadInstance()->checkVersions(txn, ns);
}
void ShardConnection::releaseMyConnections() {
diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h
index 8882961bb1c..4bac99332c0 100644
--- a/src/mongo/s/client/shard_connection.h
+++ b/src/mongo/s/client/shard_connection.h
@@ -103,7 +103,7 @@ public:
}
/** checks all of my thread local connections for the version of this ns */
- static void checkMyConnectionVersions(const std::string& ns);
+ static void checkMyConnectionVersions(OperationContext* txn, const std::string& ns);
/**
* Returns all the current sharded connections to the pool.
diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp
index 877fc896a69..459960b83cd 100644
--- a/src/mongo/s/cluster_write.cpp
+++ b/src/mongo/s/cluster_write.cpp
@@ -111,12 +111,12 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
/**
* Splits the chunks touched based from the targeter stats if needed.
*/
-void splitIfNeeded(const NamespaceString& nss, const TargeterStats& stats) {
+void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const TargeterStats& stats) {
if (!Chunk::ShouldAutoSplit) {
return;
}
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (!status.isOK()) {
warning() << "failed to get database config for " << nss
<< " while checking for auto-split: " << status.getStatus();
@@ -138,19 +138,20 @@ void splitIfNeeded(const NamespaceString& nss, const TargeterStats& stats) {
++it) {
ChunkPtr chunk;
try {
- chunk = chunkManager->findIntersectingChunk(it->first);
+ chunk = chunkManager->findIntersectingChunk(txn, it->first);
} catch (const AssertionException& ex) {
warning() << "could not find chunk while checking for auto-split: " << causedBy(ex);
return;
}
- chunk->splitIfShould(it->second);
+ chunk->splitIfShould(txn, it->second);
}
}
} // namespace
-Status clusterCreateIndex(const string& ns,
+Status clusterCreateIndex(OperationContext* txn,
+ const string& ns,
BSONObj keys,
bool unique,
BatchedCommandResponse* response) {
@@ -173,7 +174,7 @@ Status clusterCreateIndex(const string& ns,
}
ClusterWriter writer(false, 0);
- writer.write(request, response);
+ writer.write(txn, request, response);
if (response->getOk() != 1) {
return Status(static_cast<ErrorCodes::Error>(response->getErrCode()),
@@ -198,7 +199,8 @@ Status clusterCreateIndex(const string& ns,
}
-void ClusterWriter::write(const BatchedCommandRequest& origRequest,
+void ClusterWriter::write(OperationContext* txn,
+ const BatchedCommandRequest& origRequest,
BatchedCommandResponse* response) {
// Add _ids to insert request if req'd
unique_ptr<BatchedCommandRequest> idRequest(BatchedCommandRequest::cloneWithIds(origRequest));
@@ -242,10 +244,10 @@ void ClusterWriter::write(const BatchedCommandRequest& origRequest,
const string dbName = nss.db().toString();
if (dbName == "config" || dbName == "admin") {
- grid.catalogManager()->writeConfigServerDirect(request, response);
+ grid.catalogManager(txn)->writeConfigServerDirect(request, response);
} else {
ChunkManagerTargeter targeter(request.getTargetingNSS());
- Status targetInitStatus = targeter.init();
+ Status targetInitStatus = targeter.init(txn);
if (!targetInitStatus.isOK()) {
// Errors will be reported in response if we are unable to target
@@ -257,10 +259,10 @@ void ClusterWriter::write(const BatchedCommandRequest& origRequest,
DBClientShardResolver resolver;
DBClientMultiCommand dispatcher;
BatchWriteExec exec(&targeter, &resolver, &dispatcher);
- exec.executeBatch(request, response);
+ exec.executeBatch(txn, request, response);
if (_autoSplit) {
- splitIfNeeded(request.getNS(), *targeter.getStats());
+ splitIfNeeded(txn, request.getNS(), *targeter.getStats());
}
_stats->setShardStats(exec.releaseStats());
diff --git a/src/mongo/s/cluster_write.h b/src/mongo/s/cluster_write.h
index 9fd177f756a..0ef15426da7 100644
--- a/src/mongo/s/cluster_write.h
+++ b/src/mongo/s/cluster_write.h
@@ -37,12 +37,15 @@ namespace mongo {
class ClusterWriterStats;
class BatchWriteExecStats;
+class OperationContext;
class ClusterWriter {
public:
ClusterWriter(bool autoSplit, int timeoutMillis);
- void write(const BatchedCommandRequest& request, BatchedCommandResponse* response);
+ void write(OperationContext* txn,
+ const BatchedCommandRequest& request,
+ BatchedCommandResponse* response);
const ClusterWriterStats& getStats();
@@ -73,7 +76,8 @@ private:
*
* Note: response can be NULL if you don't care about the write statistics.
*/
-Status clusterCreateIndex(const std::string& ns,
+Status clusterCreateIndex(OperationContext* txn,
+ const std::string& ns,
BSONObj keys,
bool unique,
BatchedCommandResponse* response);
diff --git a/src/mongo/s/commands/cluster_add_shard_cmd.cpp b/src/mongo/s/commands/cluster_add_shard_cmd.cpp
index c772863d8db..7f0a8e5dbd0 100644
--- a/src/mongo/s/commands/cluster_add_shard_cmd.cpp
+++ b/src/mongo/s/commands/cluster_add_shard_cmd.cpp
@@ -120,7 +120,7 @@ public:
audit::logAddShard(ClientBasic::getCurrent(), name, servers.toString(), maxSize);
- StatusWith<string> addShardResult = grid.catalogManager()->addShard(
+ StatusWith<string> addShardResult = grid.catalogManager(txn)->addShard(
txn, (name.empty() ? nullptr : &name), servers, maxSize);
if (!addShardResult.isOK()) {
log() << "addShard request '" << cmdObj << "'"
diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp
index 93a87c9f497..89b623f7078 100644
--- a/src/mongo/s/commands/cluster_commands_common.cpp
+++ b/src/mongo/s/commands/cluster_commands_common.cpp
@@ -88,7 +88,7 @@ void Future::CommandResult::init() {
}
}
-bool Future::CommandResult::join(int maxRetries) {
+bool Future::CommandResult::join(OperationContext* txn, int maxRetries) {
if (_done) {
return _ok;
}
@@ -129,7 +129,7 @@ bool Future::CommandResult::join(int maxRetries) {
}
if (i >= maxRetries / 2) {
- if (!versionManager.forceRemoteCheckShardVersionCB(staleNS)) {
+ if (!versionManager.forceRemoteCheckShardVersionCB(txn, staleNS)) {
error() << "Future::spawnCommand (part 2) no config detected" << causedBy(e);
throw e;
}
@@ -141,7 +141,7 @@ bool Future::CommandResult::join(int maxRetries) {
warning() << "no collection namespace in stale config exception "
<< "for lazy command " << _cmd << ", could not refresh " << staleNS;
} else {
- versionManager.checkShardVersionCB(_conn, staleNS, false, 1);
+ versionManager.checkShardVersionCB(txn, _conn, staleNS, false, 1);
}
LOG(i > 1 ? 0 : 1) << "retrying lazy command" << causedBy(e);
diff --git a/src/mongo/s/commands/cluster_commands_common.h b/src/mongo/s/commands/cluster_commands_common.h
index b1b41459f01..39ef671c39c 100644
--- a/src/mongo/s/commands/cluster_commands_common.h
+++ b/src/mongo/s/commands/cluster_commands_common.h
@@ -74,7 +74,7 @@ public:
blocks until command is done
returns ok()
*/
- bool join(int maxRetries = 1);
+ bool join(OperationContext* txn, int maxRetries = 1);
private:
CommandResult(const std::string& server,
diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp
index 3ceef4cc01a..3253afc6ba2 100644
--- a/src/mongo/s/commands/cluster_count_cmd.cpp
+++ b/src/mongo/s/commands/cluster_count_cmd.cpp
@@ -154,7 +154,8 @@ public:
}
vector<Strategy::CommandResult> countResult;
- Strategy::commandOp(dbname, countCmdBuilder.done(), options, fullns, filter, &countResult);
+ Strategy::commandOp(
+ txn, dbname, countCmdBuilder.done(), options, fullns, filter, &countResult);
long long total = 0;
BSONObjBuilder shardSubTotal(result.subobjStart("shards"));
@@ -212,7 +213,8 @@ public:
Timer timer;
vector<Strategy::CommandResult> shardResults;
- Strategy::commandOp(dbname, explainCmdBob.obj(), 0, fullns, targetingQuery, &shardResults);
+ Strategy::commandOp(
+ txn, dbname, explainCmdBob.obj(), 0, fullns, targetingQuery, &shardResults);
long long millisElapsed = timer.millis();
diff --git a/src/mongo/s/commands/cluster_drop_database_cmd.cpp b/src/mongo/s/commands/cluster_drop_database_cmd.cpp
index 8f2781fb4dc..e913c5f8a7e 100644
--- a/src/mongo/s/commands/cluster_drop_database_cmd.cpp
+++ b/src/mongo/s/commands/cluster_drop_database_cmd.cpp
@@ -90,7 +90,7 @@ public:
// Refresh the database metadata
grid.catalogCache()->invalidate(dbname);
- auto status = grid.catalogCache()->getDatabase(dbname);
+ auto status = grid.catalogCache()->getDatabase(txn, dbname);
if (!status.isOK()) {
if (status == ErrorCodes::DatabaseNotFound) {
result.append("info", "database does not exist");
diff --git a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp
index 62749bfbab2..37ef726dfb1 100644
--- a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp
+++ b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp
@@ -104,7 +104,7 @@ public:
return false;
}
- Status status = grid.catalogManager()->enableSharding(dbname);
+ Status status = grid.catalogManager(txn)->enableSharding(dbname);
if (status.isOK()) {
audit::logEnableSharding(ClientBasic::getCurrent(), dbname);
}
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 9bd33cb24f0..fe9ab7787d5 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -82,7 +82,7 @@ public:
BSONObjBuilder* out) const {
const string ns = parseNsCollectionRequired(dbName, cmdObj);
- auto status = grid.catalogCache()->getDatabase(dbName);
+ auto status = grid.catalogCache()->getDatabase(txn, dbName);
uassertStatusOK(status);
shared_ptr<DBConfig> conf = status.getValue();
@@ -92,7 +92,7 @@ public:
if (!conf->isShardingEnabled() || !conf->isSharded(ns)) {
shard = grid.shardRegistry()->getShard(conf->getPrimaryId());
} else {
- shared_ptr<ChunkManager> chunkMgr = _getChunkManager(conf, ns);
+ shared_ptr<ChunkManager> chunkMgr = _getChunkManager(txn, conf, ns);
const BSONObj query = cmdObj.getObjectField("query");
@@ -102,7 +102,7 @@ public:
}
BSONObj shardKey = status.getValue();
- ChunkPtr chunk = chunkMgr->findIntersectingChunk(shardKey);
+ ChunkPtr chunk = chunkMgr->findIntersectingChunk(txn, shardKey);
shard = grid.shardRegistry()->getShard(chunk->getShardId());
}
@@ -145,12 +145,12 @@ public:
// findAndModify should only be creating database if upsert is true, but this would
// require that the parsing be pulled into this function.
- auto conf = uassertStatusOK(grid.implicitCreateDb(dbName));
+ auto conf = uassertStatusOK(grid.implicitCreateDb(txn, dbName));
if (!conf->isShardingEnabled() || !conf->isSharded(ns)) {
return _runCommand(conf, conf->getPrimaryId(), ns, cmdObj, result);
}
- shared_ptr<ChunkManager> chunkMgr = _getChunkManager(conf, ns);
+ shared_ptr<ChunkManager> chunkMgr = _getChunkManager(txn, conf, ns);
const BSONObj query = cmdObj.getObjectField("query");
@@ -161,13 +161,13 @@ public:
}
BSONObj shardKey = status.getValue();
- ChunkPtr chunk = chunkMgr->findIntersectingChunk(shardKey);
+ ChunkPtr chunk = chunkMgr->findIntersectingChunk(txn, shardKey);
bool ok = _runCommand(conf, chunk->getShardId(), ns, cmdObj, result);
if (ok) {
// check whether split is necessary (using update object for size heuristic)
if (Chunk::ShouldAutoSplit) {
- chunk->splitIfShould(cmdObj.getObjectField("update").objsize());
+ chunk->splitIfShould(txn, cmdObj.getObjectField("update").objsize());
}
}
@@ -175,8 +175,10 @@ public:
}
private:
- shared_ptr<ChunkManager> _getChunkManager(shared_ptr<DBConfig> conf, const string& ns) const {
- shared_ptr<ChunkManager> chunkMgr = conf->getChunkManager(ns);
+ shared_ptr<ChunkManager> _getChunkManager(OperationContext* txn,
+ shared_ptr<DBConfig> conf,
+ const string& ns) const {
+ shared_ptr<ChunkManager> chunkMgr = conf->getChunkManager(txn, ns);
massert(13002, "shard internal error chunk manager should never be null", chunkMgr);
return chunkMgr;
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp
index c00cc24368f..8fe8b2724a1 100644
--- a/src/mongo/s/commands/cluster_find_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_cmd.cpp
@@ -130,7 +130,8 @@ public:
Timer timer;
vector<Strategy::CommandResult> shardResults;
- Strategy::commandOp(dbname,
+ Strategy::commandOp(txn,
+ dbname,
explainCmdBob.obj(),
lpq->getOptions(),
fullns,
diff --git a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp
index 54d88f7d6bb..9ac3d9bdb61 100644
--- a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp
+++ b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp
@@ -94,7 +94,7 @@ public:
result, Status(ErrorCodes::InvalidNamespace, "no namespace specified"));
}
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
@@ -106,7 +106,7 @@ public:
Status(ErrorCodes::NamespaceNotSharded, "ns [" + nss.ns() + " is not sharded."));
}
- ChunkManagerPtr cm = config->getChunkManagerIfExists(nss.ns());
+ ChunkManagerPtr cm = config->getChunkManagerIfExists(txn, nss.ns());
if (!cm) {
errmsg = "no chunk manager?";
return false;
diff --git a/src/mongo/s/commands/cluster_index_filter_cmd.cpp b/src/mongo/s/commands/cluster_index_filter_cmd.cpp
index d3465a4bb48..05c5724ee3e 100644
--- a/src/mongo/s/commands/cluster_index_filter_cmd.cpp
+++ b/src/mongo/s/commands/cluster_index_filter_cmd.cpp
@@ -120,7 +120,7 @@ bool ClusterIndexFilterCmd::run(OperationContext* txn,
// Targeted shard commands are generally data-dependent but index filter
// commands are tied to query shape (data has no effect on query shape).
vector<Strategy::CommandResult> results;
- Strategy::commandOp(dbName, cmdObj, options, nss.ns(), BSONObj(), &results);
+ Strategy::commandOp(txn, dbName, cmdObj, options, nss.ns(), BSONObj(), &results);
// Set value of first shard result's "ok" field.
bool clusterCmdResult = true;
diff --git a/src/mongo/s/commands/cluster_list_databases_cmd.cpp b/src/mongo/s/commands/cluster_list_databases_cmd.cpp
index 98c062c9441..213bcc17b1c 100644
--- a/src/mongo/s/commands/cluster_list_databases_cmd.cpp
+++ b/src/mongo/s/commands/cluster_list_databases_cmd.cpp
@@ -158,11 +158,12 @@ public:
bb.append(temp.obj());
}
+ auto catalogManager = grid.catalogManager(txn);
{
// get config db from the config servers
BSONObjBuilder builder;
- if (!grid.catalogManager()->runReadCommand("config", BSON("dbstats" << 1), &builder)) {
+ if (!catalogManager->runReadCommand("config", BSON("dbstats" << 1), &builder)) {
bb.append(BSON("name"
<< "config"));
} else {
@@ -183,7 +184,7 @@ public:
// get admin db from the config servers
BSONObjBuilder builder;
- if (!grid.catalogManager()->runReadCommand("admin", BSON("dbstats" << 1), &builder)) {
+ if (!catalogManager->runReadCommand("admin", BSON("dbstats" << 1), &builder)) {
bb.append(BSON("name"
<< "admin"));
} else {
diff --git a/src/mongo/s/commands/cluster_list_shards_cmd.cpp b/src/mongo/s/commands/cluster_list_shards_cmd.cpp
index 16eccd3c8e1..da852b42ee7 100644
--- a/src/mongo/s/commands/cluster_list_shards_cmd.cpp
+++ b/src/mongo/s/commands/cluster_list_shards_cmd.cpp
@@ -74,7 +74,7 @@ public:
std::string& errmsg,
BSONObjBuilder& result) {
std::vector<ShardType> shards;
- Status status = grid.catalogManager()->getAllShards(&shards);
+ Status status = grid.catalogManager(txn)->getAllShards(&shards);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index c6ed59b111f..7da7568360a 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -212,7 +212,7 @@ public:
}
// Ensure the input database exists
- auto status = grid.catalogCache()->getDatabase(dbname);
+ auto status = grid.catalogCache()->getDatabase(txn, dbname);
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
@@ -222,7 +222,7 @@ public:
shared_ptr<DBConfig> confOut;
if (customOutDB) {
// Create the output database implicitly, since we have a custom output requested
- confOut = uassertStatusOK(grid.implicitCreateDb(outDB));
+ confOut = uassertStatusOK(grid.implicitCreateDb(txn, outDB));
} else {
confOut = confIn;
}
@@ -301,7 +301,7 @@ public:
// TODO: take distributed lock to prevent split / migration?
try {
- Strategy::commandOp(dbname, shardedCommand, 0, fullns, q, &mrCommandResults);
+ Strategy::commandOp(txn, dbname, shardedCommand, 0, fullns, q, &mrCommandResults);
} catch (DBException& e) {
e.addContext(str::stream() << "could not run map command on all shards for ns "
<< fullns << " and query " << q);
@@ -422,7 +422,7 @@ public:
// Create the sharded collection if needed
if (!confOut->isSharded(finalColLong)) {
// Enable sharding on db
- confOut->enableSharding();
+ confOut->enableSharding(txn);
// Shard collection according to split points
vector<BSONObj> sortedSplitPts;
@@ -444,7 +444,7 @@ public:
BSONObj sortKey = BSON("_id" << 1);
ShardKeyPattern sortKeyPattern(sortKey);
- Status status = grid.catalogManager()->shardCollection(
+ Status status = grid.catalogManager(txn)->shardCollection(
txn, finalColLong, sortKeyPattern, true, sortedSplitPts, outShardIds);
if (!status.isOK()) {
return appendCommandStatus(result, status);
@@ -454,7 +454,7 @@ public:
map<BSONObj, int> chunkSizes;
{
// Take distributed lock to prevent split / migration.
- auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(
+ auto scopedDistLock = grid.catalogManager(txn)->getDistLockManager()->lock(
finalColLong,
"mr-post-process",
stdx::chrono::milliseconds(-1), // retry indefinitely
@@ -469,7 +469,7 @@ public:
try {
Strategy::commandOp(
- outDB, finalCmdObj, 0, finalColLong, BSONObj(), &mrCommandResults);
+ txn, outDB, finalCmdObj, 0, finalColLong, BSONObj(), &mrCommandResults);
ok = true;
} catch (DBException& e) {
e.addContext(str::stream() << "could not run final reduce on all shards for "
@@ -511,19 +511,19 @@ public:
}
// Do the splitting round
- ChunkManagerPtr cm = confOut->getChunkManagerIfExists(finalColLong);
+ ChunkManagerPtr cm = confOut->getChunkManagerIfExists(txn, finalColLong);
for (const auto& chunkSize : chunkSizes) {
BSONObj key = chunkSize.first;
const int size = chunkSize.second;
invariant(size < std::numeric_limits<int>::max());
// key reported should be the chunk's minimum
- ChunkPtr c = cm->findIntersectingChunk(key);
+ ChunkPtr c = cm->findIntersectingChunk(txn, key);
if (!c) {
warning() << "Mongod reported " << size << " bytes inserted for key " << key
<< " but can't find chunk";
} else {
- c->splitIfShould(size);
+ c->splitIfShould(txn, size);
}
}
}
diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
index d57e2de1cce..ffce75566fc 100644
--- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
+++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
@@ -138,7 +138,7 @@ public:
result, Status(ErrorCodes::InvalidNamespace, "no namespace specified"));
}
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
@@ -151,7 +151,7 @@ public:
}
// This refreshes the chunk metadata if stale.
- ChunkManagerPtr manager = config->getChunkManagerIfExists(nss.ns(), true);
+ ChunkManagerPtr manager = config->getChunkManagerIfExists(txn, nss.ns(), true);
if (!manager) {
return appendCommandStatus(
result,
@@ -170,14 +170,14 @@ public:
minKey = manager->getShardKeyPattern().normalizeShardKey(minKey);
maxKey = manager->getShardKeyPattern().normalizeShardKey(maxKey);
- ChunkPtr firstChunk = manager->findIntersectingChunk(minKey);
+ ChunkPtr firstChunk = manager->findIntersectingChunk(txn, minKey);
verify(firstChunk);
BSONObjBuilder remoteCmdObjB;
remoteCmdObjB.append(cmdObj[ClusterMergeChunksCommand::nsField()]);
remoteCmdObjB.append(cmdObj[ClusterMergeChunksCommand::boundsField()]);
remoteCmdObjB.append(ClusterMergeChunksCommand::configField(),
- grid.catalogManager()->connectionString().toString());
+ grid.catalogManager(txn)->connectionString().toString());
remoteCmdObjB.append(ClusterMergeChunksCommand::shardNameField(), firstChunk->getShardId());
BSONObj remoteResult;
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index 45b9c63cdec..9dc54bb633c 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -116,7 +116,7 @@ public:
result, Status(ErrorCodes::InvalidNamespace, "no namespace specified"));
}
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
@@ -125,7 +125,7 @@ public:
}
if (!config->isSharded(nss.ns())) {
- config->reload();
+ config->reload(txn);
if (!config->isSharded(nss.ns())) {
return appendCommandStatus(result,
@@ -164,7 +164,7 @@ public:
}
// This refreshes the chunk metadata if stale.
- ChunkManagerPtr info = config->getChunkManager(nss.ns(), true);
+ ChunkManagerPtr info = config->getChunkManager(txn, nss.ns(), true);
ChunkPtr chunk;
if (!find.isEmpty()) {
@@ -181,7 +181,7 @@ public:
return false;
}
- chunk = info->findIntersectingChunk(shardKey);
+ chunk = info->findIntersectingChunk(txn, shardKey);
verify(chunk.get());
} else {
// Bounds
@@ -197,7 +197,7 @@ public:
BSONObj minKey = info->getShardKeyPattern().normalizeShardKey(bounds[0].Obj());
BSONObj maxKey = info->getShardKeyPattern().normalizeShardKey(bounds[1].Obj());
- chunk = info->findIntersectingChunk(minKey);
+ chunk = info->findIntersectingChunk(txn, minKey);
verify(chunk.get());
if (chunk->getMin().woCompare(minKey) != 0 || chunk->getMax().woCompare(maxKey) != 0) {
@@ -238,7 +238,8 @@ public:
}
BSONObj res;
- if (!chunk->moveAndCommit(to->getId(),
+ if (!chunk->moveAndCommit(txn,
+ to->getId(),
maxChunkSizeBytes,
writeConcern.get(),
cmdObj["_waitForDelete"].trueValue(),
diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
index ca28077860a..1a4788d28f0 100644
--- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
@@ -114,7 +114,7 @@ public:
// Flush all cached information. This can't be perfect, but it's better than nothing.
grid.catalogCache()->invalidate(dbname);
- auto status = grid.catalogCache()->getDatabase(dbname);
+ auto status = grid.catalogCache()->getDatabase(txn, dbname);
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
@@ -147,8 +147,9 @@ public:
<< " to: " << toShard->toString();
string whyMessage(str::stream() << "Moving primary shard of " << dbname);
+ auto catalogManager = grid.catalogManager(txn);
auto scopedDistLock =
- grid.catalogManager()->getDistLockManager()->lock(dbname + "-movePrimary", whyMessage);
+ catalogManager->getDistLockManager()->lock(dbname + "-movePrimary", whyMessage);
if (!scopedDistLock.isOK()) {
return appendCommandStatus(result, scopedDistLock.getStatus());
@@ -161,7 +162,7 @@ public:
BSONObj moveStartDetails =
_buildMoveEntry(dbname, fromShard->toString(), toShard->toString(), shardedColls);
- grid.catalogManager()->logChange(
+ catalogManager->logChange(
txn->getClient()->clientAddress(true), "movePrimary.start", dbname, moveStartDetails);
BSONArrayBuilder barr;
@@ -189,7 +190,7 @@ public:
ScopedDbConnection fromconn(fromShard->getConnString());
- config->setPrimary(toShard->getConnString().toString());
+ config->setPrimary(txn, toShard->getConnString().toString());
if (shardedColls.empty()) {
// TODO: Collections can be created in the meantime, and we should handle in the future.
@@ -240,7 +241,7 @@ public:
BSONObj moveFinishDetails =
_buildMoveEntry(dbname, oldPrimary, toShard->toString(), shardedColls);
- grid.catalogManager()->logChange(
+ catalogManager->logChange(
txn->getClient()->clientAddress(true), "movePrimary", dbname, moveFinishDetails);
return true;
}
diff --git a/src/mongo/s/commands/cluster_netstat_cmd.cpp b/src/mongo/s/commands/cluster_netstat_cmd.cpp
index df4c158a8b7..66a9bc8c036 100644
--- a/src/mongo/s/commands/cluster_netstat_cmd.cpp
+++ b/src/mongo/s/commands/cluster_netstat_cmd.cpp
@@ -69,7 +69,7 @@ public:
int options,
std::string& errmsg,
BSONObjBuilder& result) {
- result.append("configserver", grid.catalogManager()->connectionString().toString());
+ result.append("configserver", grid.catalogManager(txn)->connectionString().toString());
result.append("isdbgrid", 1);
return true;
}
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index da0a5dfcd12..ce272013709 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -102,7 +102,7 @@ public:
BSONObjBuilder& result) {
const string fullns = parseNs(dbname, cmdObj);
- auto status = grid.catalogCache()->getDatabase(dbname);
+ auto status = grid.catalogCache()->getDatabase(txn, dbname);
if (!status.isOK()) {
return appendEmptyResultSet(result, status.getStatus(), fullns);
}
@@ -137,7 +137,7 @@ public:
// If the first $match stage is an exact match on the shard key, we only have to send it
// to one shard, so send the command to that shard.
BSONObj firstMatchQuery = pipeline->getInitialQuery();
- ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns);
+ ChunkManagerPtr chunkMgr = conf->getChunkManager(txn, fullns);
BSONObj shardKeyMatches = uassertStatusOK(
chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery));
@@ -173,7 +173,8 @@ public:
// Run the command on the shards
// TODO need to make sure cursors are killed if a retry is needed
vector<Strategy::CommandResult> shardResults;
- Strategy::commandOp(dbname, shardedCommand, options, fullns, shardQuery, &shardResults);
+ Strategy::commandOp(
+ txn, dbname, shardedCommand, options, fullns, shardQuery, &shardResults);
if (pipeline->isExplain()) {
// This must be checked before we start modifying result.
diff --git a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp
index 1411f5bc3b1..42f240b7dd3 100644
--- a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp
+++ b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp
@@ -120,7 +120,7 @@ bool ClusterPlanCacheCmd::run(OperationContext* txn,
// Targeted shard commands are generally data-dependent but plan cache
// commands are tied to query shape (data has no effect on query shape).
vector<Strategy::CommandResult> results;
- Strategy::commandOp(dbName, cmdObj, options, nss.ns(), BSONObj(), &results);
+ Strategy::commandOp(txn, dbName, cmdObj, options, nss.ns(), BSONObj(), &results);
// Set value of first shard result's "ok" field.
bool clusterCmdResult = true;
diff --git a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
index b6cc17b9545..fc958920407 100644
--- a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
+++ b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
@@ -94,14 +94,15 @@ public:
return appendCommandStatus(result, Status(ErrorCodes::ShardNotFound, msg));
}
+ auto catalogManager = grid.catalogManager(txn);
StatusWith<ShardDrainingStatus> removeShardResult =
- grid.catalogManager()->removeShard(txn, s->getId());
+ catalogManager->removeShard(txn, s->getId());
if (!removeShardResult.isOK()) {
return appendCommandStatus(result, removeShardResult.getStatus());
}
vector<string> databases;
- grid.catalogManager()->getDatabasesForShard(s->getId(), &databases);
+ catalogManager->getDatabasesForShard(s->getId(), &databases);
// Get BSONObj containing:
// 1) note about moving or dropping databases in a shard
@@ -131,10 +132,10 @@ public:
break;
case ShardDrainingStatus::ONGOING: {
vector<ChunkType> chunks;
- Status status = grid.catalogManager()->getChunks(BSON(ChunkType::shard(s->getId())),
- BSONObj(),
- boost::none, // return all
- &chunks);
+ Status status = catalogManager->getChunks(BSON(ChunkType::shard(s->getId())),
+ BSONObj(),
+ boost::none, // return all
+ &chunks);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index 1cde82d9bde..c84946580f4 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -120,7 +120,7 @@ public:
Status(ErrorCodes::InvalidNamespace, "invalid collection namespace [" + ns + "]"));
}
- auto config = uassertStatusOK(grid.catalogCache()->getDatabase(nsStr.db().toString()));
+ auto config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nsStr.db().toString()));
if (!config->isShardingEnabled()) {
return appendCommandStatus(
result,
@@ -324,7 +324,7 @@ public:
// 5. If no useful index exists, and collection empty, create one on proposedKey.
// Only need to call ensureIndex on primary shard, since indexes get copied to
// receiving shard whenever a migrate occurs.
- Status status = clusterCreateIndex(ns, proposedKey, careAboutUnique, NULL);
+ Status status = clusterCreateIndex(txn, ns, proposedKey, careAboutUnique, NULL);
if (!status.isOK()) {
errmsg = str::stream() << "ensureIndex failed to create index on "
<< "primary shard: " << status.reason();
@@ -397,7 +397,7 @@ public:
audit::logShardCollection(ClientBasic::getCurrent(), ns, proposedKey, careAboutUnique);
- Status status = grid.catalogManager()->shardCollection(
+ Status status = grid.catalogManager(txn)->shardCollection(
txn, ns, proposedShardKey, careAboutUnique, initSplits, std::set<ShardId>{});
if (!status.isOK()) {
return appendCommandStatus(result, status);
@@ -409,7 +409,7 @@ public:
if (isHashedShardKey && isEmpty) {
// Reload the new config info. If we created more than one initial chunk, then
// we need to move them around to balance.
- ChunkManagerPtr chunkManager = config->getChunkManager(ns, true);
+ ChunkManagerPtr chunkManager = config->getChunkManager(txn, ns, true);
ChunkMap chunkMap = chunkManager->getChunkMap();
// 2. Move and commit each "big chunk" to a different shard.
@@ -431,7 +431,7 @@ public:
BSONObj moveResult;
WriteConcernOptions noThrottle;
if (!chunk->moveAndCommit(
- to->getId(), Chunk::MaxChunkSize, &noThrottle, true, 0, moveResult)) {
+ txn, to->getId(), Chunk::MaxChunkSize, &noThrottle, true, 0, moveResult)) {
warning() << "couldn't move chunk " << chunk->toString() << " to shard " << *to
<< " while sharding collection " << ns << "."
<< " Reason: " << moveResult;
@@ -443,17 +443,17 @@ public:
}
// Reload the config info, after all the migrations
- chunkManager = config->getChunkManager(ns, true);
+ chunkManager = config->getChunkManager(txn, ns, true);
// 3. Subdivide the big chunks by splitting at each of the points in "allSplits"
// that we haven't already split by.
- ChunkPtr currentChunk = chunkManager->findIntersectingChunk(allSplits[0]);
+ ChunkPtr currentChunk = chunkManager->findIntersectingChunk(txn, allSplits[0]);
vector<BSONObj> subSplits;
for (unsigned i = 0; i <= allSplits.size(); i++) {
if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) {
if (!subSplits.empty()) {
- Status status = currentChunk->multiSplit(subSplits, NULL);
+ Status status = currentChunk->multiSplit(txn, subSplits, NULL);
if (!status.isOK()) {
warning() << "couldn't split chunk " << currentChunk->toString()
<< " while sharding collection " << ns << causedBy(status);
@@ -463,7 +463,7 @@ public:
}
if (i < allSplits.size()) {
- currentChunk = chunkManager->findIntersectingChunk(allSplits[i]);
+ currentChunk = chunkManager->findIntersectingChunk(txn, allSplits[i]);
}
} else {
BSONObj splitPoint(allSplits[i]);
@@ -479,7 +479,7 @@ public:
// Proactively refresh the chunk manager. Not really necessary, but this way it's
// immediately up-to-date the next time it's used.
- config->getChunkManager(ns, true);
+ config->getChunkManager(txn, ns, true);
}
return true;
diff --git a/src/mongo/s/commands/cluster_split_collection_cmd.cpp b/src/mongo/s/commands/cluster_split_collection_cmd.cpp
index fd5f462518c..06a50c1d3cf 100644
--- a/src/mongo/s/commands/cluster_split_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_split_collection_cmd.cpp
@@ -108,14 +108,14 @@ public:
result, Status(ErrorCodes::InvalidNamespace, "no namespace specified"));
}
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
std::shared_ptr<DBConfig> config = status.getValue();
if (!config->isSharded(nss.ns())) {
- config->reload();
+ config->reload(txn);
if (!config->isSharded(nss.ns())) {
return appendCommandStatus(result,
@@ -178,7 +178,7 @@ public:
}
// This refreshes the chunk metadata if stale.
- ChunkManagerPtr info = config->getChunkManager(nss.ns(), true);
+ ChunkManagerPtr info = config->getChunkManager(txn, nss.ns(), true);
ChunkPtr chunk;
if (!find.isEmpty()) {
@@ -195,7 +195,7 @@ public:
return false;
}
- chunk = info->findIntersectingChunk(shardKey);
+ chunk = info->findIntersectingChunk(txn, shardKey);
invariant(chunk.get());
} else if (!bounds.isEmpty()) {
if (!info->getShardKeyPattern().isShardKey(bounds[0].Obj()) ||
@@ -210,7 +210,7 @@ public:
BSONObj minKey = info->getShardKeyPattern().normalizeShardKey(bounds[0].Obj());
BSONObj maxKey = info->getShardKeyPattern().normalizeShardKey(bounds[1].Obj());
- chunk = info->findIntersectingChunk(minKey);
+ chunk = info->findIntersectingChunk(txn, minKey);
invariant(chunk.get());
if (chunk->getMin().woCompare(minKey) != 0 || chunk->getMax().woCompare(maxKey) != 0) {
@@ -235,7 +235,7 @@ public:
return appendCommandStatus(result, status);
}
- chunk = info->findIntersectingChunk(middle);
+ chunk = info->findIntersectingChunk(txn, middle);
invariant(chunk.get());
if (chunk->getMin().woCompare(middle) == 0 || chunk->getMax().woCompare(middle) == 0) {
@@ -252,7 +252,7 @@ public:
BSONObj res;
if (middle.isEmpty()) {
- Status status = chunk->split(Chunk::atMedian, NULL, NULL);
+ Status status = chunk->split(txn, Chunk::atMedian, NULL, NULL);
if (!status.isOK()) {
errmsg = "split failed";
result.append("cause", status.toString());
@@ -262,7 +262,7 @@ public:
vector<BSONObj> splitPoints;
splitPoints.push_back(middle);
- Status status = chunk->multiSplit(splitPoints, NULL);
+ Status status = chunk->multiSplit(txn, splitPoints, NULL);
if (!status.isOK()) {
errmsg = "split failed";
result.append("cause", status.toString());
diff --git a/src/mongo/s/commands/cluster_user_management_commands.cpp b/src/mongo/s/commands/cluster_user_management_commands.cpp
index d2a6ab667d4..7985a305503 100644
--- a/src/mongo/s/commands/cluster_user_management_commands.cpp
+++ b/src/mongo/s/commands/cluster_user_management_commands.cpp
@@ -78,8 +78,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- return grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ return grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
}
virtual void redactForLogging(mutablebson::Document* cmdObj) {
@@ -121,8 +121,8 @@ public:
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -172,8 +172,8 @@ public:
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -212,8 +212,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -260,8 +260,8 @@ public:
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -308,8 +308,8 @@ public:
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -352,7 +352,7 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- return grid.catalogManager()->runUserManagementReadCommand(dbname, cmdObj, &result);
+ return grid.catalogManager(txn)->runUserManagementReadCommand(dbname, cmdObj, &result);
}
} cmdUsersInfo;
@@ -385,8 +385,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- return grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ return grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
}
} cmdCreateRole;
@@ -419,8 +419,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -459,8 +459,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -499,8 +499,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -539,8 +539,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -579,8 +579,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -622,8 +622,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -666,8 +666,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- const bool ok = grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ const bool ok = grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
AuthorizationManager* authzManager = getGlobalAuthorizationManager();
invariant(authzManager);
@@ -710,7 +710,7 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- return grid.catalogManager()->runUserManagementReadCommand(dbname, cmdObj, &result);
+ return grid.catalogManager(txn)->runUserManagementReadCommand(dbname, cmdObj, &result);
}
} cmdRolesInfo;
@@ -797,8 +797,8 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- return grid.catalogManager()->runUserManagementWriteCommand(
- this->name, dbname, cmdObj, &result);
+ return grid.catalogManager(txn)
+ ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result);
}
} cmdMergeAuthzCollections;
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index a39d5f75439..cea3c22307b 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -120,7 +120,7 @@ public:
BatchItemRef targetingBatchItem(&request, 0);
vector<Strategy::CommandResult> shardResults;
Status status =
- _commandOpWrite(dbname, explainCmdBob.obj(), targetingBatchItem, &shardResults);
+ _commandOpWrite(txn, dbname, explainCmdBob.obj(), targetingBatchItem, &shardResults);
if (!status.isOK()) {
return status;
}
@@ -153,7 +153,7 @@ public:
response.setErrCode(ErrorCodes::FailedToParse);
response.setErrMessage(errmsg);
} else {
- writer.write(request, &response);
+ writer.write(txn, request, &response);
}
dassert(response.isValid(NULL));
@@ -222,7 +222,8 @@ private:
*
* Does *not* retry or retarget if the metadata is stale.
*/
- static Status _commandOpWrite(const std::string& dbName,
+ static Status _commandOpWrite(OperationContext* txn,
+ const std::string& dbName,
const BSONObj& command,
BatchItemRef targetingBatchItem,
std::vector<Strategy::CommandResult>* results) {
@@ -231,7 +232,7 @@ private:
ChunkManagerTargeter targeter(
NamespaceString(targetingBatchItem.getRequest()->getTargetingNS()));
- Status status = targeter.init();
+ Status status = targeter.init(txn);
if (!status.isOK())
return status;
@@ -240,17 +241,17 @@ private:
if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) {
ShardEndpoint* endpoint;
- Status status = targeter.targetInsert(targetingBatchItem.getDocument(), &endpoint);
+ Status status = targeter.targetInsert(txn, targetingBatchItem.getDocument(), &endpoint);
if (!status.isOK())
return status;
endpoints.push_back(endpoint);
} else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) {
- Status status = targeter.targetUpdate(*targetingBatchItem.getUpdate(), &endpoints);
+ Status status = targeter.targetUpdate(txn, *targetingBatchItem.getUpdate(), &endpoints);
if (!status.isOK())
return status;
} else {
invariant(targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete);
- Status status = targeter.targetDelete(*targetingBatchItem.getDelete(), &endpoints);
+ Status status = targeter.targetDelete(txn, *targetingBatchItem.getDelete(), &endpoints);
if (!status.isOK())
return status;
}
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index 6bb4d896b43..0e2e18b86b8 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -143,10 +143,13 @@ public:
bool implicitCreateDb = false)
: RunOnAllShardsCommand(n, oldname, useShardConn, implicitCreateDb) {}
- virtual void getShardIds(const string& dbName, BSONObj& cmdObj, vector<ShardId>& shardIds) {
+ virtual void getShardIds(OperationContext* txn,
+ const string& dbName,
+ BSONObj& cmdObj,
+ vector<ShardId>& shardIds) {
const string fullns = dbName + '.' + cmdObj.firstElement().valuestrsafe();
- auto status = grid.catalogCache()->getDatabase(dbName);
+ auto status = grid.catalogCache()->getDatabase(txn, dbName);
uassertStatusOK(status.getStatus());
shared_ptr<DBConfig> conf = status.getValue();
@@ -172,7 +175,7 @@ public:
BSONObjBuilder& result) {
const string fullns = parseNs(dbName, cmdObj);
- auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName));
+ auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName));
if (!conf->isSharded(fullns)) {
return passthrough(conf, cmdObj, options, result);
}
@@ -397,7 +400,7 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- auto status = grid.implicitCreateDb(dbName);
+ auto status = grid.implicitCreateDb(txn, dbName);
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
@@ -426,7 +429,7 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- auto status = grid.catalogCache()->getDatabase(dbName);
+ auto status = grid.catalogCache()->getDatabase(txn, dbName);
if (!status.isOK()) {
if (status == ErrorCodes::DatabaseNotFound) {
return true;
@@ -445,7 +448,7 @@ public:
return passthrough(db, cmdObj, result);
}
- uassertStatusOK(grid.catalogManager()->dropCollection(txn, NamespaceString(fullns)));
+ uassertStatusOK(grid.catalogManager(txn)->dropCollection(txn, NamespaceString(fullns)));
// Force a full reload next time the just dropped namespace is accessed
db->invalidateNs(fullns);
@@ -474,11 +477,11 @@ public:
BSONObjBuilder& result) {
const string fullnsFrom = cmdObj.firstElement().valuestrsafe();
const string dbNameFrom = nsToDatabase(fullnsFrom);
- auto confFrom = uassertStatusOK(grid.catalogCache()->getDatabase(dbNameFrom));
+ auto confFrom = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbNameFrom));
const string fullnsTo = cmdObj["to"].valuestrsafe();
const string dbNameTo = nsToDatabase(fullnsTo);
- auto confTo = uassertStatusOK(grid.catalogCache()->getDatabase(dbNameTo));
+ auto confTo = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbNameTo));
uassert(13138, "You can't rename a sharded collection", !confFrom->isSharded(fullnsFrom));
uassert(13139, "You can't rename to a sharded collection", !confTo->isSharded(fullnsTo));
@@ -516,7 +519,7 @@ public:
uassert(ErrorCodes::EmptyFieldName, "missing todb argument", !todb.empty());
uassert(ErrorCodes::InvalidNamespace, "invalid todb argument", nsIsDbOnly(todb));
- auto confTo = uassertStatusOK(grid.implicitCreateDb(todb));
+ auto confTo = uassertStatusOK(grid.implicitCreateDb(txn, todb));
uassert(ErrorCodes::IllegalOperation,
"cannot copy to a sharded database",
!confTo->isShardingEnabled());
@@ -529,7 +532,7 @@ public:
uassert(13399, "need a fromdb argument", !fromdb.empty());
shared_ptr<DBConfig> confFrom =
- uassertStatusOK(grid.catalogCache()->getDatabase(fromdb));
+ uassertStatusOK(grid.catalogCache()->getDatabase(txn, fromdb));
uassert(13400, "don't know where source DB is", confFrom);
uassert(13401, "cant copy from sharded DB", !confFrom->isShardingEnabled());
@@ -572,7 +575,7 @@ public:
BSONObjBuilder& result) {
const string fullns = parseNs(dbName, cmdObj);
- auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName));
+ auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName));
if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) {
result.appendBool("sharded", false);
result.append("primary", conf->getPrimaryId());
@@ -582,7 +585,7 @@ public:
result.appendBool("sharded", true);
- ChunkManagerPtr cm = conf->getChunkManager(fullns);
+ ChunkManagerPtr cm = conf->getChunkManager(txn, fullns);
massert(12594, "how could chunk manager be null!", cm);
BSONObjBuilder shardStats;
@@ -733,12 +736,12 @@ public:
BSONObjBuilder& result) {
const string fullns = parseNs(dbName, cmdObj);
- auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName));
+ auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName));
if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) {
return passthrough(conf, cmdObj, result);
}
- ChunkManagerPtr cm = conf->getChunkManager(fullns);
+ ChunkManagerPtr cm = conf->getChunkManager(txn, fullns);
massert(13407, "how could chunk manager be null!", cm);
BSONObj min = cmdObj.getObjectField("min");
@@ -841,8 +844,8 @@ public:
Timer timer;
Strategy::CommandResult singleResult;
- Status commandStat =
- Strategy::commandOpUnsharded(dbname, explainCmdBob.obj(), 0, fullns, &singleResult);
+ Status commandStat = Strategy::commandOpUnsharded(
+ txn, dbname, explainCmdBob.obj(), 0, fullns, &singleResult);
if (!commandStat.isOK()) {
return commandStat;
}
@@ -918,7 +921,7 @@ public:
BSONObjBuilder& result) {
const string fullns = parseNs(dbName, cmdObj);
- auto status = grid.catalogCache()->getDatabase(dbName);
+ auto status = grid.catalogCache()->getDatabase(txn, dbName);
if (!status.isOK()) {
return appendEmptyResultSet(result, status.getStatus(), fullns);
}
@@ -928,7 +931,7 @@ public:
return passthrough(conf, cmdObj, options, result);
}
- ChunkManagerPtr cm = conf->getChunkManager(fullns);
+ ChunkManagerPtr cm = conf->getChunkManager(txn, fullns);
massert(10420, "how could chunk manager be null!", cm);
BSONObj query = getQuery(cmdObj);
@@ -997,7 +1000,8 @@ public:
Timer timer;
vector<Strategy::CommandResult> shardResults;
- Strategy::commandOp(dbname, explainCmdBob.obj(), 0, fullns, targetingQuery, &shardResults);
+ Strategy::commandOp(
+ txn, dbname, explainCmdBob.obj(), 0, fullns, targetingQuery, &shardResults);
long long millisElapsed = timer.millis();
@@ -1037,18 +1041,18 @@ public:
BSONObjBuilder& result) {
const string fullns = parseNs(dbName, cmdObj);
- auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName));
+ auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName));
if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) {
return passthrough(conf, cmdObj, result);
}
- ChunkManagerPtr cm = conf->getChunkManager(fullns);
+ ChunkManagerPtr cm = conf->getChunkManager(txn, fullns);
massert(13091, "how could chunk manager be null!", cm);
if (cm->getShardKeyPattern().toBSON() == BSON("files_id" << 1)) {
BSONObj finder = BSON("files_id" << cmdObj.firstElement());
vector<Strategy::CommandResult> results;
- Strategy::commandOp(dbName, cmdObj, 0, fullns, finder, &results);
+ Strategy::commandOp(txn, dbName, cmdObj, 0, fullns, finder, &results);
verify(results.size() == 1); // querying on shard key so should only talk to one shard
BSONObj res = results.begin()->result;
@@ -1080,7 +1084,7 @@ public:
vector<Strategy::CommandResult> results;
try {
- Strategy::commandOp(dbName, shardCmd, 0, fullns, finder, &results);
+ Strategy::commandOp(txn, dbName, shardCmd, 0, fullns, finder, &results);
} catch (DBException& e) {
// This is handled below and logged
Strategy::CommandResult errResult;
@@ -1165,12 +1169,12 @@ public:
BSONObjBuilder& result) {
const string fullns = parseNs(dbName, cmdObj);
- auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName));
+ auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName));
if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) {
return passthrough(conf, cmdObj, options, result);
}
- ChunkManagerPtr cm = conf->getChunkManager(fullns);
+ ChunkManagerPtr cm = conf->getChunkManager(txn, fullns);
massert(13500, "how could chunk manager be null!", cm);
BSONObj query = getQuery(cmdObj);
@@ -1206,7 +1210,7 @@ public:
i != futures.end();
i++) {
shared_ptr<Future::CommandResult> res = *i;
- if (!res->join()) {
+ if (!res->join(txn)) {
errmsg = res->result()["errmsg"].String();
if (res->result().hasField("code")) {
result.append(res->result()["code"]);
@@ -1331,7 +1335,7 @@ public:
// $eval isn't allowed to access sharded collections, but we need to leave the
// shard to detect that.
- auto status = grid.catalogCache()->getDatabase(dbName);
+ auto status = grid.catalogCache()->getDatabase(txn, dbName);
if (!status.isOK()) {
return appendCommandStatus(result, status.getStatus());
}
@@ -1370,7 +1374,7 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- auto status = grid.catalogCache()->getDatabase(dbName);
+ auto status = grid.catalogCache()->getDatabase(txn, dbName);
if (!status.isOK()) {
return appendEmptyResultSet(
result, status.getStatus(), dbName + ".$cmd.listCollections");
@@ -1408,7 +1412,7 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) {
- auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName));
+ auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName));
bool retval = passthrough(conf, cmdObj, result);
const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId());
diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp
index 4b8d17b4c8c..91b10dd1db0 100644
--- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp
+++ b/src/mongo/s/commands/run_on_all_shards_cmd.cpp
@@ -62,7 +62,8 @@ BSONObj RunOnAllShardsCommand::specialErrorHandler(const std::string& server,
return originalResult;
}
-void RunOnAllShardsCommand::getShardIds(const std::string& db,
+void RunOnAllShardsCommand::getShardIds(OperationContext* txn,
+ const std::string& db,
BSONObj& cmdObj,
std::vector<ShardId>& shardIds) {
grid.shardRegistry()->getAllShardIds(&shardIds);
@@ -77,11 +78,11 @@ bool RunOnAllShardsCommand::run(OperationContext* txn,
LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << cmdObj;
if (_implicitCreateDb) {
- uassertStatusOK(grid.implicitCreateDb(dbName));
+ uassertStatusOK(grid.implicitCreateDb(txn, dbName));
}
std::vector<ShardId> shardIds;
- getShardIds(dbName, cmdObj, shardIds);
+ getShardIds(txn, dbName, cmdObj, shardIds);
std::list<std::shared_ptr<Future::CommandResult>> futures;
for (const ShardId& shardId : shardIds) {
@@ -108,7 +109,7 @@ bool RunOnAllShardsCommand::run(OperationContext* txn,
++futuresit, ++shardIdsIt) {
std::shared_ptr<Future::CommandResult> res = *futuresit;
- if (res->join()) {
+ if (res->join(txn)) {
// success :)
BSONObj result = res->result();
results.emplace_back(*shardIdsIt, result);
diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.h b/src/mongo/s/commands/run_on_all_shards_cmd.h
index 149887864a7..b1601f9ef90 100644
--- a/src/mongo/s/commands/run_on_all_shards_cmd.h
+++ b/src/mongo/s/commands/run_on_all_shards_cmd.h
@@ -81,7 +81,8 @@ public:
const BSONObj& originalResult) const;
// The default implementation uses all shards.
- virtual void getShardIds(const std::string& db,
+ virtual void getShardIds(OperationContext* txn,
+ const std::string& db,
BSONObj& cmdObj,
std::vector<ShardId>& shardIds);
diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp
index 3ae784329a0..f173bfe05bd 100644
--- a/src/mongo/s/config.cpp
+++ b/src/mongo/s/config.cpp
@@ -62,10 +62,10 @@ using std::string;
using std::unique_ptr;
using std::vector;
-CollectionInfo::CollectionInfo(const CollectionType& coll) {
+CollectionInfo::CollectionInfo(OperationContext* txn, const CollectionType& coll) {
_dropped = coll.getDropped();
- shard(new ChunkManager(coll));
+ shard(txn, new ChunkManager(coll));
_dirty = false;
}
@@ -78,9 +78,9 @@ void CollectionInfo::resetCM(ChunkManager* cm) {
_cm.reset(cm);
}
-void CollectionInfo::shard(ChunkManager* manager) {
+void CollectionInfo::shard(OperationContext* txn, ChunkManager* manager) {
// Do this *first* so we're invisible to everyone else
- manager->loadExistingRanges(nullptr);
+ manager->loadExistingRanges(txn, nullptr);
//
// Collections with no chunks are unsharded, no matter what the collections entry says
@@ -111,7 +111,7 @@ void CollectionInfo::useChunkManager(ChunkManagerPtr manager) {
_dropped = false;
}
-void CollectionInfo::save(const string& ns) {
+void CollectionInfo::save(OperationContext* txn, const string& ns) {
CollectionType coll;
coll.setNs(NamespaceString{ns});
@@ -130,7 +130,7 @@ void CollectionInfo::save(const string& ns) {
coll.setUpdatedAt(Date_t::now());
}
- uassertStatusOK(grid.catalogManager()->updateCollection(ns, coll));
+ uassertStatusOK(grid.catalogManager(txn)->updateCollection(ns, coll));
_dirty = false;
}
@@ -177,7 +177,7 @@ void DBConfig::invalidateNs(const std::string& ns) {
}
}
-void DBConfig::enableSharding() {
+void DBConfig::enableSharding(OperationContext* txn) {
verify(_name != "config");
stdx::lock_guard<stdx::mutex> lk(_lock);
@@ -186,10 +186,10 @@ void DBConfig::enableSharding() {
}
_shardingEnabled = true;
- _save();
+ _save(txn);
}
-bool DBConfig::removeSharding(const string& ns) {
+bool DBConfig::removeSharding(OperationContext* txn, const string& ns) {
if (!_shardingEnabled) {
warning() << "could not remove sharding for collection " << ns
<< ", sharding not enabled for db";
@@ -211,7 +211,7 @@ bool DBConfig::removeSharding(const string& ns) {
}
ci.unshard();
- _save(false, true);
+ _save(txn, false, true);
return true;
}
@@ -255,21 +255,23 @@ void DBConfig::getChunkManagerOrPrimary(const string& ns,
}
-ChunkManagerPtr DBConfig::getChunkManagerIfExists(const string& ns,
+ChunkManagerPtr DBConfig::getChunkManagerIfExists(OperationContext* txn,
+ const string& ns,
bool shouldReload,
bool forceReload) {
// Don't report exceptions here as errors in GetLastError
LastError::Disabled ignoreForGLE(&LastError::get(cc()));
try {
- return getChunkManager(ns, shouldReload, forceReload);
+ return getChunkManager(txn, ns, shouldReload, forceReload);
} catch (AssertionException& e) {
warning() << "chunk manager not found for " << ns << causedBy(e);
return ChunkManagerPtr();
}
}
-std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns,
+std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* txn,
+ const string& ns,
bool shouldReload,
bool forceReload) {
BSONObj key;
@@ -282,7 +284,7 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns,
bool earlyReload = !_collections[ns].isSharded() && (shouldReload || forceReload);
if (earlyReload) {
// This is to catch cases where there this is a new sharded collection
- _load();
+ _load(txn);
}
CollectionInfo& ci = _collections[ns];
@@ -309,7 +311,7 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns,
// currently
vector<ChunkType> newestChunk;
if (oldVersion.isSet() && !forceReload) {
- uassertStatusOK(grid.catalogManager()->getChunks(
+ uassertStatusOK(grid.catalogManager(txn)->getChunks(
BSON(ChunkType::ns(ns)), BSON(ChunkType::DEPRECATED_lastmod() << -1), 1, &newestChunk));
if (!newestChunk.empty()) {
@@ -356,13 +358,13 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns,
tempChunkManager.reset(new ChunkManager(
oldManager->getns(), oldManager->getShardKeyPattern(), oldManager->isUnique()));
- tempChunkManager->loadExistingRanges(oldManager.get());
+ tempChunkManager->loadExistingRanges(txn, oldManager.get());
if (tempChunkManager->numChunks() == 0) {
// Maybe we're not sharded any more, so do a full reload
- reload();
+ reload(txn);
- return getChunkManager(ns, false);
+ return getChunkManager(txn, ns, false);
}
}
@@ -411,21 +413,21 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns,
return ci.getCM();
}
-void DBConfig::setPrimary(const std::string& s) {
+void DBConfig::setPrimary(OperationContext* txn, const std::string& s) {
const auto shard = grid.shardRegistry()->getShard(s);
stdx::lock_guard<stdx::mutex> lk(_lock);
_primaryId = shard->getId();
- _save();
+ _save(txn);
}
-bool DBConfig::load() {
+bool DBConfig::load(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_lock);
- return _load();
+ return _load(txn);
}
-bool DBConfig::_load() {
- StatusWith<DatabaseType> status = grid.catalogManager()->getDatabase(_name);
+bool DBConfig::_load(OperationContext* txn) {
+ StatusWith<DatabaseType> status = grid.catalogManager(txn)->getDatabase(_name);
if (status == ErrorCodes::DatabaseNotFound) {
return false;
}
@@ -440,7 +442,7 @@ bool DBConfig::_load() {
// Load all collections
vector<CollectionType> collections;
- uassertStatusOK(grid.catalogManager()->getCollections(&_name, &collections));
+ uassertStatusOK(grid.catalogManager(txn)->getCollections(&_name, &collections));
int numCollsErased = 0;
int numCollsSharded = 0;
@@ -450,7 +452,7 @@ bool DBConfig::_load() {
_collections.erase(coll.getNs().ns());
numCollsErased++;
} else {
- _collections[coll.getNs().ns()] = CollectionInfo(coll);
+ _collections[coll.getNs().ns()] = CollectionInfo(txn, coll);
numCollsSharded++;
}
}
@@ -461,14 +463,14 @@ bool DBConfig::_load() {
return true;
}
-void DBConfig::_save(bool db, bool coll) {
+void DBConfig::_save(OperationContext* txn, bool db, bool coll) {
if (db) {
DatabaseType dbt;
dbt.setName(_name);
dbt.setPrimary(_primaryId);
dbt.setSharded(_shardingEnabled);
- uassertStatusOK(grid.catalogManager()->updateDatabase(_name, dbt));
+ uassertStatusOK(grid.catalogManager(txn)->updateDatabase(_name, dbt));
}
if (coll) {
@@ -477,17 +479,17 @@ void DBConfig::_save(bool db, bool coll) {
continue;
}
- i->second.save(i->first);
+ i->second.save(txn, i->first);
}
}
}
-bool DBConfig::reload() {
+bool DBConfig::reload(OperationContext* txn) {
bool successful = false;
{
stdx::lock_guard<stdx::mutex> lk(_lock);
- successful = _load();
+ successful = _load(txn);
}
// If we aren't successful loading the database entry, we don't want to keep the stale
@@ -508,14 +510,14 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) {
*/
log() << "DBConfig::dropDatabase: " << _name;
- grid.catalogManager()->logChange(
- txn->getClient()->clientAddress(true), "dropDatabase.start", _name, BSONObj());
+ grid.catalogManager(txn)
+ ->logChange(txn->getClient()->clientAddress(true), "dropDatabase.start", _name, BSONObj());
// 1
grid.catalogCache()->invalidate(_name);
- Status result = grid.catalogManager()->remove(
- DatabaseType::ConfigNS, BSON(DatabaseType::name(_name)), 0, NULL);
+ Status result = grid.catalogManager(txn)
+ ->remove(DatabaseType::ConfigNS, BSON(DatabaseType::name(_name)), 0, NULL);
if (!result.isOK()) {
errmsg = result.reason();
log() << "could not drop '" << _name << "': " << errmsg;
@@ -570,8 +572,8 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) {
LOG(1) << "\t dropped primary db for: " << _name;
- grid.catalogManager()->logChange(
- txn->getClient()->clientAddress(true), "dropDatabase", _name, BSONObj());
+ grid.catalogManager(txn)
+ ->logChange(txn->getClient()->clientAddress(true), "dropDatabase", _name, BSONObj());
return true;
}
@@ -604,11 +606,11 @@ bool DBConfig::_dropShardedCollections(OperationContext* txn,
i->second.getCM()->getAllShardIds(&shardIds);
- uassertStatusOK(grid.catalogManager()->dropCollection(txn, NamespaceString(i->first)));
+ uassertStatusOK(grid.catalogManager(txn)->dropCollection(txn, NamespaceString(i->first)));
// We should warn, but it's not a fatal error if someone else reloaded the db/coll as
// unsharded in the meantime
- if (!removeSharding(i->first)) {
+ if (!removeSharding(txn, i->first)) {
warning() << "collection " << i->first
<< " was reloaded as unsharded before drop completed"
<< " during drop of all collections";
@@ -648,8 +650,9 @@ void DBConfig::getAllShardedCollections(set<string>& namespaces) {
/* --- ConfigServer ---- */
-void ConfigServer::reloadSettings() {
- auto chunkSizeResult = grid.catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey);
+void ConfigServer::reloadSettings(OperationContext* txn) {
+ auto catalogManager = grid.catalogManager(txn);
+ auto chunkSizeResult = catalogManager->getGlobalSettings(SettingsType::ChunkSizeDocKey);
if (chunkSizeResult.isOK()) {
const int csize = chunkSizeResult.getValue().getChunkSizeMB();
LOG(1) << "Found MaxChunkSize: " << csize;
@@ -660,10 +663,10 @@ void ConfigServer::reloadSettings() {
} else if (chunkSizeResult.getStatus() == ErrorCodes::NoMatchingDocument) {
const int chunkSize = Chunk::MaxChunkSize / (1024 * 1024);
Status result =
- grid.catalogManager()->insert(SettingsType::ConfigNS,
- BSON(SettingsType::key(SettingsType::ChunkSizeDocKey)
- << SettingsType::chunkSizeMB(chunkSize)),
- NULL);
+ grid.catalogManager(txn)->insert(SettingsType::ConfigNS,
+ BSON(SettingsType::key(SettingsType::ChunkSizeDocKey)
+ << SettingsType::chunkSizeMB(chunkSize)),
+ NULL);
if (!result.isOK()) {
warning() << "couldn't set chunkSize on config db" << causedBy(result);
}
@@ -672,7 +675,8 @@ void ConfigServer::reloadSettings() {
}
// indexes
- Status result = clusterCreateIndex(ChunkType::ConfigNS,
+ Status result = clusterCreateIndex(txn,
+ ChunkType::ConfigNS,
BSON(ChunkType::ns() << 1 << ChunkType::min() << 1),
true, // unique
NULL);
@@ -682,6 +686,7 @@ void ConfigServer::reloadSettings() {
}
result = clusterCreateIndex(
+ txn,
ChunkType::ConfigNS,
BSON(ChunkType::ns() << 1 << ChunkType::shard() << 1 << ChunkType::min() << 1),
true, // unique
@@ -691,7 +696,8 @@ void ConfigServer::reloadSettings() {
warning() << "couldn't create ns_1_shard_1_min_1 index on config db" << causedBy(result);
}
- result = clusterCreateIndex(ChunkType::ConfigNS,
+ result = clusterCreateIndex(txn,
+ ChunkType::ConfigNS,
BSON(ChunkType::ns() << 1 << ChunkType::DEPRECATED_lastmod() << 1),
true, // unique
NULL);
@@ -700,7 +706,8 @@ void ConfigServer::reloadSettings() {
warning() << "couldn't create ns_1_lastmod_1 index on config db" << causedBy(result);
}
- result = clusterCreateIndex(ShardType::ConfigNS,
+ result = clusterCreateIndex(txn,
+ ShardType::ConfigNS,
BSON(ShardType::host() << 1),
true, // unique
NULL);
@@ -709,7 +716,8 @@ void ConfigServer::reloadSettings() {
warning() << "couldn't create host_1 index on config db" << causedBy(result);
}
- result = clusterCreateIndex(LocksType::ConfigNS,
+ result = clusterCreateIndex(txn,
+ LocksType::ConfigNS,
BSON(LocksType::lockID() << 1),
false, // unique
NULL);
@@ -718,7 +726,8 @@ void ConfigServer::reloadSettings() {
warning() << "couldn't create lock id index on config db" << causedBy(result);
}
- result = clusterCreateIndex(LocksType::ConfigNS,
+ result = clusterCreateIndex(txn,
+ LocksType::ConfigNS,
BSON(LocksType::state() << 1 << LocksType::process() << 1),
false, // unique
NULL);
@@ -727,7 +736,8 @@ void ConfigServer::reloadSettings() {
warning() << "couldn't create state and process id index on config db" << causedBy(result);
}
- result = clusterCreateIndex(LockpingsType::ConfigNS,
+ result = clusterCreateIndex(txn,
+ LockpingsType::ConfigNS,
BSON(LockpingsType::ping() << 1),
false, // unique
NULL);
@@ -736,7 +746,8 @@ void ConfigServer::reloadSettings() {
warning() << "couldn't create lockping ping time index on config db" << causedBy(result);
}
- result = clusterCreateIndex(TagsType::ConfigNS,
+ result = clusterCreateIndex(txn,
+ TagsType::ConfigNS,
BSON(TagsType::ns() << 1 << TagsType::min() << 1),
true, // unique
NULL);
@@ -749,6 +760,7 @@ void ConfigServer::reloadSettings() {
void ConfigServer::replicaSetChange(const string& setName, const string& newConnectionString) {
// This is run in it's own thread. Exceptions escaping would result in a call to terminate.
Client::initThread("replSetChange");
+ auto txn = cc().makeOperationContext();
try {
std::shared_ptr<Shard> s = grid.shardRegistry()->lookupRSName(setName);
@@ -757,13 +769,13 @@ void ConfigServer::replicaSetChange(const string& setName, const string& newConn
return;
}
- Status result = grid.catalogManager()->update(
- ShardType::ConfigNS,
- BSON(ShardType::name(s->getId())),
- BSON("$set" << BSON(ShardType::host(newConnectionString))),
- false, // upsert
- false, // multi
- NULL);
+ Status result = grid.catalogManager(txn.get())
+ ->update(ShardType::ConfigNS,
+ BSON(ShardType::name(s->getId())),
+ BSON("$set" << BSON(ShardType::host(newConnectionString))),
+ false, // upsert
+ false, // multi
+ NULL);
if (!result.isOK()) {
error() << "RSChangeWatcher: could not update config db for set: " << setName
<< " to: " << newConnectionString << ": " << result.reason();
diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h
index 36d03949933..a779c8ee70c 100644
--- a/src/mongo/s/config.h
+++ b/src/mongo/s/config.h
@@ -48,7 +48,7 @@ struct CollectionInfo {
_dropped = false;
}
- CollectionInfo(const CollectionType& in);
+ CollectionInfo(OperationContext* txn, const CollectionType& in);
~CollectionInfo();
bool isSharded() const {
@@ -61,7 +61,7 @@ struct CollectionInfo {
void resetCM(ChunkManager* cm);
- void shard(ChunkManager* cm);
+ void shard(OperationContext* txn, ChunkManager* cm);
void unshard();
@@ -73,7 +73,7 @@ struct CollectionInfo {
return _dropped;
}
- void save(const std::string& ns);
+ void save(OperationContext* txn, const std::string& ns);
void useChunkManager(std::shared_ptr<ChunkManager> manager);
@@ -125,12 +125,12 @@ public:
*/
void invalidateNs(const std::string& ns);
- void enableSharding();
+ void enableSharding(OperationContext* txn);
/**
@return true if there was sharding info to remove
*/
- bool removeSharding(const std::string& ns);
+ bool removeSharding(OperationContext* txn, const std::string& ns);
/**
* @return whether or not the 'ns' collection is partitioned
@@ -143,10 +143,12 @@ public:
std::shared_ptr<ChunkManager>& manager,
std::shared_ptr<Shard>& primary);
- std::shared_ptr<ChunkManager> getChunkManager(const std::string& ns,
+ std::shared_ptr<ChunkManager> getChunkManager(OperationContext* txn,
+ const std::string& ns,
bool reload = false,
bool forceReload = false);
- std::shared_ptr<ChunkManager> getChunkManagerIfExists(const std::string& ns,
+ std::shared_ptr<ChunkManager> getChunkManagerIfExists(OperationContext* txn,
+ const std::string& ns,
bool reload = false,
bool forceReload = false);
@@ -155,10 +157,10 @@ public:
*/
const ShardId& getShardId(const std::string& ns);
- void setPrimary(const std::string& s);
+ void setPrimary(OperationContext* txn, const std::string& s);
- bool load();
- bool reload();
+ bool load(OperationContext* txn);
+ bool reload(OperationContext* txn);
bool dropDatabase(OperationContext*, std::string& errmsg);
@@ -173,9 +175,9 @@ protected:
std::set<ShardId>& shardIds,
std::string& errmsg);
- bool _load();
+ bool _load(OperationContext* txn);
- void _save(bool db = true, bool coll = true);
+ void _save(OperationContext* txn, bool db = true, bool coll = true);
// Name of the database which this entry caches
const std::string _name;
@@ -198,7 +200,7 @@ protected:
class ConfigServer {
public:
- static void reloadSettings();
+ static void reloadSettings(OperationContext* txn);
static void replicaSetChange(const std::string& setName,
const std::string& newConnectionString);
diff --git a/src/mongo/s/d_merge.cpp b/src/mongo/s/d_merge.cpp
index e7ec42da278..bd399b3f832 100644
--- a/src/mongo/s/d_merge.cpp
+++ b/src/mongo/s/d_merge.cpp
@@ -53,7 +53,8 @@ using std::shared_ptr;
using std::string;
using mongoutils::str::stream;
-static Status runApplyOpsCmd(const std::vector<ChunkType>&,
+static Status runApplyOpsCmd(OperationContext* txn,
+ const std::vector<ChunkType>&,
const ChunkVersion&,
const ChunkVersion&);
@@ -70,7 +71,8 @@ bool mergeChunks(OperationContext* txn,
// Get the distributed lock
string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to "
<< maxKey;
- auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(nss.ns(), whyMessage);
+ auto scopedDistLock =
+ grid.catalogManager(txn)->getDistLockManager()->lock(nss.ns(), whyMessage);
if (!scopedDistLock.isOK()) {
*errMsg = stream() << "could not acquire collection lock for " << nss.ns()
@@ -229,7 +231,7 @@ bool mergeChunks(OperationContext* txn,
//
// Run apply ops command
//
- Status applyOpsStatus = runApplyOpsCmd(chunksToMerge, shardVersion, mergeVersion);
+ Status applyOpsStatus = runApplyOpsCmd(txn, chunksToMerge, shardVersion, mergeVersion);
if (!applyOpsStatus.isOK()) {
warning() << applyOpsStatus;
return false;
@@ -253,8 +255,8 @@ bool mergeChunks(OperationContext* txn,
BSONObj mergeLogEntry = buildMergeLogEntry(chunksToMerge, shardVersion, mergeVersion);
- grid.catalogManager()->logChange(
- txn->getClient()->clientAddress(true), "merge", nss.ns(), mergeLogEntry);
+ grid.catalogManager(txn)
+ ->logChange(txn->getClient()->clientAddress(true), "merge", nss.ns(), mergeLogEntry);
return true;
}
@@ -330,7 +332,8 @@ BSONArray buildOpPrecond(const string& ns,
return preCond.arr();
}
-Status runApplyOpsCmd(const std::vector<ChunkType>& chunksToMerge,
+Status runApplyOpsCmd(OperationContext* txn,
+ const std::vector<ChunkType>& chunksToMerge,
const ChunkVersion& currShardVersion,
const ChunkVersion& newMergedVersion) {
BSONArrayBuilder updatesB;
@@ -354,6 +357,6 @@ Status runApplyOpsCmd(const std::vector<ChunkType>& chunksToMerge,
}
BSONArray preCond = buildOpPrecond(firstChunk.getNS(), firstChunk.getShard(), currShardVersion);
- return grid.catalogManager()->applyChunkOpsDeprecated(updatesB.arr(), preCond);
+ return grid.catalogManager(txn)->applyChunkOpsDeprecated(updatesB.arr(), preCond);
}
}
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index 8bf5c089569..f1024978064 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -433,7 +433,7 @@ public:
string whyMessage(str::stream() << "migrating chunk [" << minKey << ", " << maxKey
<< ") in " << ns);
- auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(ns, whyMessage);
+ auto scopedDistLock = grid.catalogManager(txn)->getDistLockManager()->lock(ns, whyMessage);
if (!scopedDistLock.isOK()) {
errmsg = stream() << "could not acquire collection lock for " << ns
@@ -447,8 +447,8 @@ public:
BSONObj chunkInfo =
BSON("min" << min << "max" << max << "from" << fromShardName << "to" << toShardName);
- grid.catalogManager()->logChange(
- txn->getClient()->clientAddress(true), "moveChunk.start", ns, chunkInfo);
+ grid.catalogManager(txn)
+ ->logChange(txn->getClient()->clientAddress(true), "moveChunk.start", ns, chunkInfo);
// Always refresh our metadata remotely
ChunkVersion origShardVersion;
@@ -578,7 +578,7 @@ public:
recvChunkStartBuilder.append("max", max);
recvChunkStartBuilder.append("shardKeyPattern", shardKeyPattern);
recvChunkStartBuilder.append("configServer",
- ShardingState::get(txn)->getConfigServer());
+ ShardingState::get(txn)->getConfigServer(txn));
recvChunkStartBuilder.append("secondaryThrottle", isSecondaryThrottle);
// Follow the same convention in moveChunk.
@@ -898,11 +898,11 @@ public:
}
applyOpsStatus =
- grid.catalogManager()->applyChunkOpsDeprecated(updates.arr(), preCond.arr());
+ grid.catalogManager(txn)->applyChunkOpsDeprecated(updates.arr(), preCond.arr());
if (MONGO_FAIL_POINT(failMigrationApplyOps)) {
throw SocketException(SocketException::RECV_ERROR,
- ShardingState::get(txn)->getConfigServer());
+ ShardingState::get(txn)->getConfigServer(txn));
}
} catch (const DBException& ex) {
warning() << ex << migrateLog;
@@ -950,11 +950,11 @@ public:
// we assume that if that mod made it to the config, the applyOps was successful
try {
std::vector<ChunkType> newestChunk;
- Status status = grid.catalogManager()->getChunks(
- BSON(ChunkType::ns(ns)),
- BSON(ChunkType::DEPRECATED_lastmod() << -1),
- 1,
- &newestChunk);
+ Status status = grid.catalogManager(txn)
+ ->getChunks(BSON(ChunkType::ns(ns)),
+ BSON(ChunkType::DEPRECATED_lastmod() << -1),
+ 1,
+ &newestChunk);
uassertStatusOK(status);
ChunkVersion checkVersion;
@@ -989,7 +989,7 @@ public:
commitInfo.appendElements(res["counts"].Obj());
}
- grid.catalogManager()->logChange(
+ grid.catalogManager(txn)->logChange(
txn->getClient()->clientAddress(true), "moveChunk.commit", ns, commitInfo.obj());
}
diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp
index 1722899f7ab..0c6dd0f9ff5 100644
--- a/src/mongo/s/d_split.cpp
+++ b/src/mongo/s/d_split.cpp
@@ -603,7 +603,7 @@ public:
// Initialize our current shard name in the shard state if needed
shardingState->gotShardName(shardName);
- auto configLocStatus = ConnectionString::parse(shardingState->getConfigServer());
+ auto configLocStatus = ConnectionString::parse(shardingState->getConfigServer(txn));
if (!configLocStatus.isOK()) {
warning() << configLocStatus.getStatus();
return false;
@@ -617,7 +617,7 @@ public:
string whyMessage(str::stream() << "splitting chunk [" << minKey << ", " << maxKey
<< ") in " << ns);
- auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(ns, whyMessage);
+ auto scopedDistLock = grid.catalogManager(txn)->getDistLockManager()->lock(ns, whyMessage);
if (!scopedDistLock.isOK()) {
errmsg = str::stream() << "could not acquire collection lock for " << ns
@@ -787,7 +787,7 @@ public:
// 4. apply the batch of updates to remote and local metadata
//
Status applyOpsStatus =
- grid.catalogManager()->applyChunkOpsDeprecated(updates.arr(), preCond.arr());
+ grid.catalogManager(txn)->applyChunkOpsDeprecated(updates.arr(), preCond.arr());
if (!applyOpsStatus.isOK()) {
return appendCommandStatus(result, applyOpsStatus);
}
@@ -824,8 +824,8 @@ public:
appendShortVersion(logDetail.subobjStart("left"), *newChunks[0]);
appendShortVersion(logDetail.subobjStart("right"), *newChunks[1]);
- grid.catalogManager()->logChange(
- txn->getClient()->clientAddress(true), "split", ns, logDetail.obj());
+ grid.catalogManager(txn)
+ ->logChange(txn->getClient()->clientAddress(true), "split", ns, logDetail.obj());
} else {
BSONObj beforeDetailObj = logDetail.obj();
BSONObj firstDetailObj = beforeDetailObj.getOwned();
@@ -838,7 +838,7 @@ public:
chunkDetail.append("of", newChunksSize);
appendShortVersion(chunkDetail.subobjStart("chunk"), *newChunks[i]);
- grid.catalogManager()->logChange(
+ grid.catalogManager(txn)->logChange(
txn->getClient()->clientAddress(true), "multi-split", ns, chunkDetail.obj());
}
}
diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp
index 594bef917fe..402df200a22 100644
--- a/src/mongo/s/d_state.cpp
+++ b/src/mongo/s/d_state.cpp
@@ -150,7 +150,7 @@ public:
ShardingState* shardingState = ShardingState::get(txn);
if (shardingState->enabled()) {
- result.append("configServer", shardingState->getConfigServer());
+ result.append("configServer", shardingState->getConfigServer(txn));
} else {
result.append("configServer", "");
}
@@ -213,7 +213,7 @@ public:
Lock::DBLock dbXLock(txn->lockState(), dbname, MODE_X);
OldClientContext ctx(txn, dbname);
- ShardingState::get(txn)->appendInfo(result);
+ ShardingState::get(txn)->appendInfo(txn, result);
return true;
}
diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp
index 6c34f735607..db779ff4e17 100644
--- a/src/mongo/s/grid.cpp
+++ b/src/mongo/s/grid.cpp
@@ -60,16 +60,17 @@ void Grid::init(std::unique_ptr<CatalogManager> catalogManager,
_cursorManager = stdx::make_unique<ClusterCursorManager>();
}
-StatusWith<std::shared_ptr<DBConfig>> Grid::implicitCreateDb(const std::string& dbName) {
- auto status = catalogCache()->getDatabase(dbName);
+StatusWith<std::shared_ptr<DBConfig>> Grid::implicitCreateDb(OperationContext* txn,
+ const std::string& dbName) {
+ auto status = catalogCache()->getDatabase(txn, dbName);
if (status.isOK()) {
return status;
}
if (status == ErrorCodes::DatabaseNotFound) {
- auto statusCreateDb = catalogManager()->createDatabase(dbName);
+ auto statusCreateDb = catalogManager(txn)->createDatabase(dbName);
if (statusCreateDb.isOK() || statusCreateDb == ErrorCodes::NamespaceExists) {
- return catalogCache()->getDatabase(dbName);
+ return catalogCache()->getDatabase(txn, dbName);
}
return statusCreateDb;
@@ -103,8 +104,9 @@ bool Grid::shouldBalance(const SettingsType& balancerSettings) const {
return true;
}
-bool Grid::getConfigShouldBalance() const {
- auto balSettingsResult = grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey);
+bool Grid::getConfigShouldBalance(OperationContext* txn) const {
+ auto balSettingsResult =
+ grid.catalogManager(txn)->getGlobalSettings(SettingsType::BalancerDocKey);
if (!balSettingsResult.isOK()) {
warning() << balSettingsResult.getStatus();
return false;
diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h
index 7136e69b5c8..430ac7a29f6 100644
--- a/src/mongo/s/grid.h
+++ b/src/mongo/s/grid.h
@@ -40,6 +40,7 @@ class BSONObj;
class CatalogCache;
class CatalogManager;
class DBConfig;
+class OperationContext;
class SettingsType;
class ShardRegistry;
template <typename T>
@@ -68,7 +69,8 @@ public:
/**
* Implicitly creates the specified database as non-sharded.
*/
- StatusWith<std::shared_ptr<DBConfig>> implicitCreateDb(const std::string& dbName);
+ StatusWith<std::shared_ptr<DBConfig>> implicitCreateDb(OperationContext* txn,
+ const std::string& dbName);
/**
* @return true if shards and config servers are allowed to use 'localhost' in address
@@ -89,11 +91,12 @@ public:
/**
* Returns true if the config server settings indicate that the balancer should be active.
*/
- bool getConfigShouldBalance() const;
+ bool getConfigShouldBalance(OperationContext* txn) const;
- CatalogManager* catalogManager() {
+ CatalogManager* catalogManager(OperationContext* txn) {
return _catalogManager.get();
}
+
CatalogCache* catalogCache() {
return _catalogCache.get();
}
diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h
index d450899031a..44bc5be18c2 100644
--- a/src/mongo/s/mock_ns_targeter.h
+++ b/src/mongo/s/mock_ns_targeter.h
@@ -85,7 +85,7 @@ public:
/**
* Returns a ShardEndpoint for the doc from the mock ranges
*/
- Status targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const {
+ Status targetInsert(OperationContext* txn, const BSONObj& doc, ShardEndpoint** endpoint) const {
std::vector<ShardEndpoint*> endpoints;
Status status = targetQuery(doc, &endpoints);
if (!status.isOK())
@@ -99,7 +99,8 @@ public:
* Returns the first ShardEndpoint for the query from the mock ranges. Only can handle
* queries of the form { field : { $gte : <value>, $lt : <value> } }.
*/
- Status targetUpdate(const BatchedUpdateDocument& updateDoc,
+ Status targetUpdate(OperationContext* txn,
+ const BatchedUpdateDocument& updateDoc,
std::vector<ShardEndpoint*>* endpoints) const {
return targetQuery(updateDoc.getQuery(), endpoints);
}
@@ -108,7 +109,8 @@ public:
* Returns the first ShardEndpoint for the query from the mock ranges. Only can handle
* queries of the form { field : { $gte : <value>, $lt : <value> } }.
*/
- Status targetDelete(const BatchedDeleteDocument& deleteDoc,
+ Status targetDelete(OperationContext* txn,
+ const BatchedDeleteDocument& deleteDoc,
std::vector<ShardEndpoint*>* endpoints) const {
return targetQuery(deleteDoc.getQuery(), endpoints);
}
@@ -138,7 +140,7 @@ public:
// No-op
}
- Status refreshIfNeeded(bool* wasChanged) {
+ Status refreshIfNeeded(OperationContext* txn, bool* wasChanged) {
// No-op
if (wasChanged)
*wasChanged = false;
diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h
index 0b8e513f7fa..7aed94536ea 100644
--- a/src/mongo/s/ns_targeter.h
+++ b/src/mongo/s/ns_targeter.h
@@ -40,6 +40,7 @@
namespace mongo {
+class OperationContext;
struct ShardEndpoint;
/**
@@ -81,14 +82,17 @@ public:
*
* Returns !OK with message if document could not be targeted for other reasons.
*/
- virtual Status targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const = 0;
+ virtual Status targetInsert(OperationContext* txn,
+ const BSONObj& doc,
+ ShardEndpoint** endpoint) const = 0;
/**
* Returns a vector of ShardEndpoints for a potentially multi-shard update.
*
* Returns OK and fills the endpoints; returns a status describing the error otherwise.
*/
- virtual Status targetUpdate(const BatchedUpdateDocument& updateDoc,
+ virtual Status targetUpdate(OperationContext* txn,
+ const BatchedUpdateDocument& updateDoc,
std::vector<ShardEndpoint*>* endpoints) const = 0;
/**
@@ -96,7 +100,8 @@ public:
*
* Returns OK and fills the endpoints; returns a status describing the error otherwise.
*/
- virtual Status targetDelete(const BatchedDeleteDocument& deleteDoc,
+ virtual Status targetDelete(OperationContext* txn,
+ const BatchedDeleteDocument& deleteDoc,
std::vector<ShardEndpoint*>* endpoints) const = 0;
/**
@@ -141,7 +146,7 @@ public:
* NOTE: This function may block for shared resources or network calls.
* Returns !OK with message if could not refresh
*/
- virtual Status refreshIfNeeded(bool* wasChanged) = 0;
+ virtual Status refreshIfNeeded(OperationContext* txn, bool* wasChanged) = 0;
};
/**
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 0daabd89073..d3f03681a1a 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -200,7 +200,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn,
std::vector<BSONObj>* results) {
invariant(results);
- auto dbConfig = grid.catalogCache()->getDatabase(query.nss().db().toString());
+ auto dbConfig = grid.catalogCache()->getDatabase(txn, query.nss().db().toString());
if (!dbConfig.isOK()) {
return dbConfig.getStatus();
}
@@ -229,7 +229,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn,
<< retries << " of " << kMaxStaleConfigRetries << ": " << status.reason();
invariant(chunkManager);
- chunkManager = chunkManager->reload();
+ chunkManager = chunkManager->reload(txn);
}
return {ErrorCodes::StaleShardVersion,
diff --git a/src/mongo/s/request.cpp b/src/mongo/s/request.cpp
index 4e5b5626730..1ea079f3a43 100644
--- a/src/mongo/s/request.cpp
+++ b/src/mongo/s/request.cpp
@@ -80,7 +80,7 @@ void Request::init() {
_didInit = true;
}
-void Request::process(int attempt) {
+void Request::process(OperationContext* txn, int attempt) {
init();
int op = _m.operation();
verify(op > dbMsg);
@@ -108,15 +108,15 @@ void Request::process(int attempt) {
<< ") for $cmd type ns - can only be 1 or -1",
n == 1 || n == -1);
- Strategy::clientCommandOp(*this);
+ Strategy::clientCommandOp(txn, *this);
} else {
- Strategy::queryOp(*this);
+ Strategy::queryOp(txn, *this);
}
} else if (op == dbGetMore) {
- Strategy::getMore(*this);
+ Strategy::getMore(txn, *this);
globalOpCounters.gotOp(op, iscmd);
} else {
- Strategy::writeOp(op, *this);
+ Strategy::writeOp(txn, op, *this);
// globalOpCounters are handled by write commands.
}
diff --git a/src/mongo/s/request.h b/src/mongo/s/request.h
index cee65b0dce2..6292610f971 100644
--- a/src/mongo/s/request.h
+++ b/src/mongo/s/request.h
@@ -35,6 +35,7 @@
namespace mongo {
class Client;
+class OperationContext;
class Request {
MONGO_DISALLOW_COPYING(Request);
@@ -72,7 +73,7 @@ public:
return _p;
}
- void process(int attempt = 0);
+ void process(OperationContext* txn, int attempt = 0);
void init();
diff --git a/src/mongo/s/s_only.cpp b/src/mongo/s/s_only.cpp
index 661bc785fe9..275ca7ae3d2 100644
--- a/src/mongo/s/s_only.cpp
+++ b/src/mongo/s/s_only.cpp
@@ -144,7 +144,8 @@ void Command::execCommandClientBasic(OperationContext* txn,
appendCommandStatus(result, ok, errmsg);
}
-void Command::runAgainstRegistered(const char* ns,
+void Command::runAgainstRegistered(OperationContext* txn,
+ const char* ns,
BSONObj& jsobj,
BSONObjBuilder& anObjBuilder,
int queryOptions) {
@@ -165,8 +166,7 @@ void Command::runAgainstRegistered(const char* ns,
return;
}
- auto txn = cc().makeOperationContext();
- execCommandClientBasic(txn.get(), c, cc(), queryOptions, ns, jsobj, anObjBuilder);
+ execCommandClientBasic(txn, c, cc(), queryOptions, ns, jsobj, anObjBuilder);
}
void Command::registerError(OperationContext* txn, const DBException& exception) {}
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index d74f4823aa7..7467d763ec8 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -130,10 +130,11 @@ public:
virtual void process(Message& m, AbstractMessagingPort* p) {
verify(p);
Request r(m, p);
+ auto txn = cc().makeOperationContext();
try {
r.init();
- r.process();
+ r.process(txn.get());
} catch (const AssertionException& ex) {
LOG(ex.isUserAssertion() ? 1 : 0) << "Assertion failed"
<< " while processing " << opToString(m.operation())
@@ -169,8 +170,13 @@ public:
void start(const MessageServer::Options& opts) {
balancer.go();
cursorCache.startTimeoutThread();
+
UserCacheInvalidator cacheInvalidatorThread(getGlobalAuthorizationManager());
- cacheInvalidatorThread.go();
+ {
+ auto txn = cc().makeOperationContext();
+ cacheInvalidatorThread.initialize(txn.get());
+ cacheInvalidatorThread.go();
+ }
PeriodicTask::startRunningPeriodicTasks();
@@ -190,13 +196,14 @@ DBClientBase* createDirectClient(OperationContext* txn) {
using namespace mongo;
-static Status initializeSharding(bool doUpgrade) {
+static Status initializeSharding(OperationContext* txn, bool doUpgrade) {
Status status = initializeGlobalShardingState(mongosGlobalParams.configdbs);
if (!status.isOK()) {
return status;
}
- status = grid.catalogManager()->checkAndUpgrade(!doUpgrade);
+ auto catalogManager = grid.catalogManager(txn);
+ status = catalogManager->checkAndUpgrade(!doUpgrade);
if (!status.isOK()) {
return status;
}
@@ -205,7 +212,7 @@ static Status initializeSharding(bool doUpgrade) {
return Status::OK();
}
- status = grid.catalogManager()->startup();
+ status = catalogManager->startup();
if (!status.isOK()) {
return status;
}
@@ -231,16 +238,19 @@ static ExitCode runMongosServer(bool doUpgrade) {
dbexit(EXIT_BADOPTIONS);
}
- Status status = initializeSharding(doUpgrade);
- if (!status.isOK()) {
- error() << "Error initializing sharding system: " << status;
- return EXIT_SHARDING_ERROR;
- }
- if (doUpgrade) {
- return EXIT_CLEAN;
- }
+ {
+ auto txn = cc().makeOperationContext();
+ Status status = initializeSharding(txn.get(), doUpgrade);
+ if (!status.isOK()) {
+ error() << "Error initializing sharding system: " << status;
+ return EXIT_SHARDING_ERROR;
+ }
+ if (doUpgrade) {
+ return EXIT_CLEAN;
+ }
- ConfigServer::reloadSettings();
+ ConfigServer::reloadSettings(txn.get());
+ }
#if !defined(_WIN32)
mongo::signalForkSuccess();
@@ -255,7 +265,7 @@ static ExitCode runMongosServer(bool doUpgrade) {
web.detach();
}
- status = getGlobalAuthorizationManager()->initialize(NULL);
+ Status status = getGlobalAuthorizationManager()->initialize(NULL);
if (!status.isOK()) {
error() << "Initializing authorization data failed: " << status;
return EXIT_SHARDING_ERROR;
@@ -434,7 +444,19 @@ void mongo::signalShutdown() {
void mongo::exitCleanly(ExitCode code) {
// TODO: do we need to add anything?
- grid.catalogManager()->shutDown();
+ {
+ Client& client = cc();
+ ServiceContext::UniqueOperationContext uniqueTxn;
+ OperationContext* txn = client.getOperationContext();
+ if (!txn) {
+ uniqueTxn = client.makeOperationContext();
+ txn = uniqueTxn.get();
+ }
+
+ auto catalogMgr = grid.catalogManager(txn);
+ catalogMgr->shutDown();
+ }
+
mongo::dbexit(code);
}
diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp
index 817a5d00682..c3475ad36f7 100644
--- a/src/mongo/s/strategy.cpp
+++ b/src/mongo/s/strategy.cpp
@@ -87,7 +87,7 @@ static bool _isSystemIndexes(const char* ns) {
/**
* Returns true if request is a query for sharded indexes.
*/
-static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) {
+static bool doShardedIndexQuery(OperationContext* txn, Request& r, const QuerySpec& qSpec) {
// Extract the ns field from the query, which may be embedded within the "query" or
// "$query" field.
auto nsField = qSpec.filter()["ns"];
@@ -96,7 +96,7 @@ static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) {
}
const NamespaceString indexNSSQuery(nsField.str());
- auto status = grid.catalogCache()->getDatabase(indexNSSQuery.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, indexNSSQuery.db().toString());
if (!status.isOK()) {
return false;
}
@@ -147,7 +147,7 @@ static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) {
return true;
}
-void Strategy::queryOp(Request& r) {
+void Strategy::queryOp(OperationContext* txn, Request& r) {
verify(!NamespaceString(r.getns()).isCommand());
Timer queryTimer;
@@ -157,7 +157,7 @@ void Strategy::queryOp(Request& r) {
QueryMessage q(r.d());
NamespaceString ns(q.ns);
- ClientBasic* client = ClientBasic::getCurrent();
+ ClientBasic* client = txn->getClient();
AuthorizationSession* authSession = AuthorizationSession::get(client);
Status status = authSession->checkAuthForQuery(ns, q.query);
audit::logQueryAuthzCheck(client, ns, q.query, status.code());
@@ -237,7 +237,7 @@ void Strategy::queryOp(Request& r) {
StatusWith<int> maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery(q.query);
uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK());
- if (_isSystemIndexes(q.ns) && doShardedIndexQuery(r, qSpec)) {
+ if (_isSystemIndexes(q.ns) && doShardedIndexQuery(txn, r, qSpec)) {
return;
}
@@ -246,7 +246,7 @@ void Strategy::queryOp(Request& r) {
// TODO: Move out to Request itself, not strategy based
try {
- cursor->init();
+ cursor->init(txn);
if (qSpec.isExplain()) {
BSONObjBuilder explain_builder;
@@ -317,7 +317,7 @@ void Strategy::queryOp(Request& r) {
}
}
-void Strategy::clientCommandOp(Request& r) {
+void Strategy::clientCommandOp(OperationContext* txn, Request& r) {
QueryMessage q(r.d());
LOG(3) << "command: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn
@@ -333,7 +333,7 @@ void Strategy::clientCommandOp(Request& r) {
// Regular queries are handled in strategy_shard.cpp
verify(nss.isCommand() || nss.isSpecialCommand());
- if (handleSpecialNamespaces(r, q))
+ if (handleSpecialNamespaces(txn, r, q))
return;
int loops = 5;
@@ -367,7 +367,7 @@ void Strategy::clientCommandOp(Request& r) {
}
}
- Command::runAgainstRegistered(q.ns, cmdObj, builder, q.queryOptions);
+ Command::runAgainstRegistered(txn, q.ns, cmdObj, builder, q.queryOptions);
BSONObj x = builder.done();
replyToQuery(0, r.p(), r.m(), x);
return;
@@ -383,9 +383,9 @@ void Strategy::clientCommandOp(Request& r) {
if (staleNS.size() == 0)
staleNS = q.ns;
- ShardConnection::checkMyConnectionVersions(staleNS);
+ ShardConnection::checkMyConnectionVersions(txn, staleNS);
if (loops < 4)
- versionManager.forceRemoteCheckShardVersionCB(staleNS);
+ versionManager.forceRemoteCheckShardVersionCB(txn, staleNS);
} catch (AssertionException& e) {
Command::appendCommandStatus(builder, e.toStatus());
BSONObj x = builder.done();
@@ -396,7 +396,7 @@ void Strategy::clientCommandOp(Request& r) {
}
// TODO: remove after MongoDB 3.2
-bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) {
+bool Strategy::handleSpecialNamespaces(OperationContext* txn, Request& r, QueryMessage& q) {
const char* ns = strstr(r.getns(), ".$cmd.sys.");
if (!ns)
return false;
@@ -404,7 +404,7 @@ bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) {
BSONObjBuilder reply;
- const auto upgradeToRealCommand = [&r, &q, &reply](StringData commandName) {
+ const auto upgradeToRealCommand = [txn, &r, &q, &reply](StringData commandName) {
BSONObjBuilder cmdBob;
cmdBob.append(commandName, 1);
cmdBob.appendElements(q.query); // fields are validated by Commands
@@ -412,7 +412,7 @@ bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) {
NamespaceString nss(r.getns());
NamespaceString interposedNss(nss.db(), "$cmd");
Command::runAgainstRegistered(
- interposedNss.ns().c_str(), interposedCmd, reply, q.queryOptions);
+ txn, interposedNss.ns().c_str(), interposedCmd, reply, q.queryOptions);
};
if (strcmp(ns, "inprog") == 0) {
@@ -431,7 +431,8 @@ bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) {
return true;
}
-void Strategy::commandOp(const string& db,
+void Strategy::commandOp(OperationContext* txn,
+ const string& db,
const BSONObj& command,
int options,
const string& versionedNS,
@@ -442,7 +443,7 @@ void Strategy::commandOp(const string& db,
ParallelSortClusteredCursor cursor(qSpec, CommandInfo(versionedNS, targetingQuery));
// Initialize the cursor
- cursor.init();
+ cursor.init(txn);
set<ShardId> shardIds;
cursor.getQueryShardIds(shardIds);
@@ -458,14 +459,15 @@ void Strategy::commandOp(const string& db,
}
}
-Status Strategy::commandOpUnsharded(const std::string& db,
+Status Strategy::commandOpUnsharded(OperationContext* txn,
+ const std::string& db,
const BSONObj& command,
int options,
const std::string& versionedNS,
CommandResult* cmdResult) {
// Note that this implementation will not handle targeting retries and fails when the
// sharding metadata is too stale
- auto status = grid.catalogCache()->getDatabase(db);
+ auto status = grid.catalogCache()->getDatabase(txn, db);
if (!status.isOK()) {
mongoutils::str::stream ss;
ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS
@@ -507,7 +509,7 @@ Status Strategy::commandOpUnsharded(const std::string& db,
return Status::OK();
}
-void Strategy::getMore(Request& r) {
+void Strategy::getMore(OperationContext* txn, Request& r) {
Timer getMoreTimer;
const char* ns = r.getns();
@@ -517,7 +519,7 @@ void Strategy::getMore(Request& r) {
// TODO: Handle stale config exceptions here from coll being dropped or sharded during op
// for now has same semantics as legacy request
const NamespaceString nss(ns);
- auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto statusGetDb = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (statusGetDb == ErrorCodes::DatabaseNotFound) {
cursorCache.remove(id);
replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0);
@@ -619,7 +621,7 @@ void Strategy::getMore(Request& r) {
}
}
-void Strategy::writeOp(int op, Request& r) {
+void Strategy::writeOp(OperationContext* txn, int op, Request& r) {
// make sure we have a last error
dassert(&LastError::get(cc()));
@@ -648,7 +650,7 @@ void Strategy::writeOp(int op, Request& r) {
{
// Disable the last error object for the duration of the write cmd
LastError::Disabled disableLastError(&LastError::get(cc()));
- Command::runAgainstRegistered(cmdNS.c_str(), requestBSON, builder, 0);
+ Command::runAgainstRegistered(txn, cmdNS.c_str(), requestBSON, builder, 0);
}
BatchedCommandResponse response;
diff --git a/src/mongo/s/strategy.h b/src/mongo/s/strategy.h
index 40d71f2fd4d..d815f4af955 100644
--- a/src/mongo/s/strategy.h
+++ b/src/mongo/s/strategy.h
@@ -34,16 +34,18 @@
namespace mongo {
+class OperationContext;
+
/**
* Legacy interface for processing client read/write/cmd requests.
*/
class Strategy {
public:
- static void queryOp(Request& r);
+ static void queryOp(OperationContext* txn, Request& r);
- static void getMore(Request& r);
+ static void getMore(OperationContext* txn, Request& r);
- static void writeOp(int op, Request& r);
+ static void writeOp(OperationContext* txn, int op, Request& r);
struct CommandResult {
ShardId shardTargetId;
@@ -60,7 +62,8 @@ public:
* TODO: Replace these methods and all other methods of command dispatch with a more general
* command op framework.
*/
- static void commandOp(const std::string& db,
+ static void commandOp(OperationContext* txn,
+ const std::string& db,
const BSONObj& command,
int options,
const std::string& versionedNS,
@@ -76,7 +79,8 @@ public:
* On success, fills in 'shardResult' with output from the namespace's primary shard. This
* output may itself indicate an error status on the shard.
*/
- static Status commandOpUnsharded(const std::string& db,
+ static Status commandOpUnsharded(OperationContext* txn,
+ const std::string& db,
const BSONObj& command,
int options,
const std::string& versionedNS,
@@ -87,10 +91,10 @@ public:
*
* DEPRECATED: should not be used by new code.
*/
- static void clientCommandOp(Request& r);
+ static void clientCommandOp(OperationContext* txn, Request& r);
protected:
- static bool handleSpecialNamespaces(Request& r, QueryMessage& q);
+ static bool handleSpecialNamespaces(OperationContext* txn, Request& r, QueryMessage& q);
};
} // namespace mongo
diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp
index 8faed905b60..6ba65451d4e 100644
--- a/src/mongo/s/version_manager.cpp
+++ b/src/mongo/s/version_manager.cpp
@@ -187,7 +187,7 @@ DBClientBase* getVersionable(DBClientBase* conn) {
* Eventually this should go completely away, but for now many commands rely on unversioned but
* mongos-specific behavior on mongod (auditing and replication information in commands)
*/
-bool initShardVersionEmptyNS(DBClientBase* conn_in) {
+bool initShardVersionEmptyNS(OperationContext* txn, DBClientBase* conn_in) {
bool ok;
BSONObj result;
DBClientBase* conn = NULL;
@@ -214,7 +214,7 @@ bool initShardVersionEmptyNS(DBClientBase* conn_in) {
ok = setShardVersion(*conn,
"",
- grid.catalogManager()->connectionString().toString(),
+ grid.catalogManager(txn)->connectionString().toString(),
ChunkVersion(),
NULL,
true,
@@ -273,7 +273,8 @@ bool initShardVersionEmptyNS(DBClientBase* conn_in) {
*
* @return true if we contacted the remote host
*/
-bool checkShardVersion(DBClientBase* conn_in,
+bool checkShardVersion(OperationContext* txn,
+ DBClientBase* conn_in,
const string& ns,
ChunkManagerPtr refManager,
bool authoritative,
@@ -282,10 +283,10 @@ bool checkShardVersion(DBClientBase* conn_in,
// Empty namespaces are special - we require initialization but not versioning
if (ns.size() == 0) {
- return initShardVersionEmptyNS(conn_in);
+ return initShardVersionEmptyNS(txn, conn_in);
}
- auto status = grid.catalogCache()->getDatabase(nsToDatabase(ns));
+ auto status = grid.catalogCache()->getDatabase(txn, nsToDatabase(ns));
if (!status.isOK()) {
return false;
}
@@ -300,7 +301,7 @@ bool checkShardVersion(DBClientBase* conn_in,
ShardPtr primary;
ChunkManagerPtr manager;
if (authoritative)
- conf->getChunkManagerIfExists(ns, true);
+ conf->getChunkManagerIfExists(txn, ns, true);
conf->getChunkManagerOrPrimary(ns, manager, primary);
@@ -372,7 +373,7 @@ bool checkShardVersion(DBClientBase* conn_in,
BSONObj result;
if (setShardVersion(*conn,
ns,
- grid.catalogManager()->connectionString().toString(),
+ grid.catalogManager(txn)->connectionString().toString(),
version,
manager.get(),
authoritative,
@@ -390,7 +391,7 @@ bool checkShardVersion(DBClientBase* conn_in,
if (!authoritative) {
// use the original connection and get a fresh versionable connection
// since conn can be invalidated (or worse, freed) after the failure
- checkShardVersion(conn_in, ns, refManager, 1, tryNumber + 1);
+ checkShardVersion(txn, conn_in, ns, refManager, 1, tryNumber + 1);
return true;
}
@@ -400,10 +401,10 @@ bool checkShardVersion(DBClientBase* conn_in,
<< ", connection state indicates significant version changes";
// reload db
- conf->reload();
+ conf->reload(txn);
} else {
// reload config
- conf->getChunkManager(ns, true);
+ conf->getChunkManager(txn, ns, true);
}
}
@@ -414,7 +415,7 @@ bool checkShardVersion(DBClientBase* conn_in,
sleepmillis(10 * tryNumber);
// use the original connection and get a fresh versionable connection
// since conn can be invalidated (or worse, freed) after the failure
- checkShardVersion(conn_in, ns, refManager, true, tryNumber + 1);
+ checkShardVersion(txn, conn_in, ns, refManager, true, tryNumber + 1);
return true;
}
@@ -443,13 +444,13 @@ bool VersionManager::isVersionableCB(DBClientBase* conn) {
return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET;
}
-bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) {
+bool VersionManager::forceRemoteCheckShardVersionCB(OperationContext* txn, const string& ns) {
const NamespaceString nss(ns);
// This will force the database catalog entry to be reloaded
grid.catalogCache()->invalidate(nss.db().toString());
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (!status.isOK()) {
return false;
}
@@ -461,7 +462,7 @@ bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) {
return false;
}
- ChunkManagerPtr manager = conf->getChunkManagerIfExists(ns, true, true);
+ ChunkManagerPtr manager = conf->getChunkManagerIfExists(txn, ns, true, true);
if (!manager) {
return false;
}
@@ -469,18 +470,20 @@ bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) {
return true;
}
-bool VersionManager::checkShardVersionCB(DBClientBase* conn_in,
+bool VersionManager::checkShardVersionCB(OperationContext* txn,
+ DBClientBase* conn_in,
const string& ns,
bool authoritative,
int tryNumber) {
- return checkShardVersion(conn_in, ns, nullptr, authoritative, tryNumber);
+ return checkShardVersion(txn, conn_in, ns, nullptr, authoritative, tryNumber);
}
-bool VersionManager::checkShardVersionCB(ShardConnection* conn_in,
+bool VersionManager::checkShardVersionCB(OperationContext* txn,
+ ShardConnection* conn_in,
bool authoritative,
int tryNumber) {
return checkShardVersion(
- conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber);
+ txn, conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber);
}
} // namespace mongo
diff --git a/src/mongo/s/version_manager.h b/src/mongo/s/version_manager.h
index 6dc6877d574..f03fb4f34c8 100644
--- a/src/mongo/s/version_manager.h
+++ b/src/mongo/s/version_manager.h
@@ -33,6 +33,7 @@
namespace mongo {
class DBClientBase;
+class OperationContext;
class ShardConnection;
@@ -41,9 +42,9 @@ public:
VersionManager() {}
bool isVersionableCB(DBClientBase*);
- bool forceRemoteCheckShardVersionCB(const std::string&);
- bool checkShardVersionCB(DBClientBase*, const std::string&, bool, int);
- bool checkShardVersionCB(ShardConnection*, bool, int);
+ bool forceRemoteCheckShardVersionCB(OperationContext* txn, const std::string&);
+ bool checkShardVersionCB(OperationContext*, DBClientBase*, const std::string&, bool, int);
+ bool checkShardVersionCB(OperationContext*, ShardConnection*, bool, int);
void resetShardVersionCB(DBClientBase*);
};
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 1603886e2ef..d2876b4e362 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -93,7 +93,8 @@ static bool isShardMetadataChanging(const vector<ShardError*>& staleErrors) {
// This only applies when no writes are occurring and metadata is not changing on reload
static const int kMaxRoundsWithoutProgress(5);
-void BatchWriteExec::executeBatch(const BatchedCommandRequest& clientRequest,
+void BatchWriteExec::executeBatch(OperationContext* txn,
+ const BatchedCommandRequest& clientRequest,
BatchedCommandResponse* clientResponse) {
LOG(4) << "starting execution of write batch of size "
<< static_cast<int>(clientRequest.sizeWriteOps()) << " for " << clientRequest.getNS()
@@ -139,7 +140,8 @@ void BatchWriteExec::executeBatch(const BatchedCommandRequest& clientRequest,
// If we've already had a targeting error, we've refreshed the metadata once and can
// record target errors definitively.
bool recordTargetErrors = refreshedTargeter;
- Status targetStatus = batchOp.targetBatch(*_targeter, recordTargetErrors, &childBatches);
+ Status targetStatus =
+ batchOp.targetBatch(txn, *_targeter, recordTargetErrors, &childBatches);
if (!targetStatus.isOK()) {
// Don't do anything until a targeter refresh
_targeter->noteCouldNotTarget();
@@ -316,7 +318,7 @@ void BatchWriteExec::executeBatch(const BatchedCommandRequest& clientRequest,
//
bool targeterChanged = false;
- Status refreshStatus = _targeter->refreshIfNeeded(&targeterChanged);
+ Status refreshStatus = _targeter->refreshIfNeeded(txn, &targeterChanged);
if (!refreshStatus.isOK()) {
// It's okay if we can't refresh, we'll just record errors for the ops if
diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h
index 708b9481aef..f4d869d3a20 100644
--- a/src/mongo/s/write_ops/batch_write_exec.h
+++ b/src/mongo/s/write_ops/batch_write_exec.h
@@ -43,6 +43,7 @@ namespace mongo {
class BatchWriteExecStats;
class MultiCommandDispatch;
+class OperationContext;
/**
* The BatchWriteExec is able to execute client batch write requests, resulting in a batch
@@ -71,7 +72,8 @@ public:
*
* This function does not throw, any errors are reported via the clientResponse.
*/
- void executeBatch(const BatchedCommandRequest& clientRequest,
+ void executeBatch(OperationContext* txn,
+ const BatchedCommandRequest& clientRequest,
BatchedCommandResponse* clientResponse);
const BatchWriteExecStats& getStats();
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 435ebe113cd..f1e654c5a33 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/base/owned_pointer_vector.h"
+#include "mongo/db/operation_context_noop.h"
#include "mongo/s/client/mock_multi_write_command.h"
#include "mongo/s/mock_ns_targeter.h"
#include "mongo/s/mock_shard_resolver.h"
@@ -88,6 +89,7 @@ TEST(BatchWriteExecTests, SingleOp) {
// Basic execution test
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
MockSingleShardBackend backend(nss);
@@ -100,7 +102,7 @@ TEST(BatchWriteExecTests, SingleOp) {
request.getInsertRequest()->addToDocuments(BSON("x" << 1));
BatchedCommandResponse response;
- backend.exec->executeBatch(request, &response);
+ backend.exec->executeBatch(&txn, request, &response);
ASSERT(response.getOk());
const BatchWriteExecStats& stats = backend.exec->getStats();
@@ -112,6 +114,7 @@ TEST(BatchWriteExecTests, SingleOpError) {
// Basic error test
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
MockSingleShardBackend backend(nss);
@@ -133,7 +136,7 @@ TEST(BatchWriteExecTests, SingleOpError) {
request.getInsertRequest()->addToDocuments(BSON("x" << 1));
BatchedCommandResponse response;
- backend.exec->executeBatch(request, &response);
+ backend.exec->executeBatch(&txn, request, &response);
ASSERT(response.getOk());
ASSERT_EQUALS(response.getN(), 0);
ASSERT(response.isErrDetailsSet());
@@ -154,6 +157,7 @@ TEST(BatchWriteExecTests, StaleOp) {
// Retry op in exec b/c of stale config
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
// Insert request
@@ -176,7 +180,7 @@ TEST(BatchWriteExecTests, StaleOp) {
// Execute request
BatchedCommandResponse response;
- backend.exec->executeBatch(request, &response);
+ backend.exec->executeBatch(&txn, request, &response);
ASSERT(response.getOk());
const BatchWriteExecStats& stats = backend.exec->getStats();
@@ -188,6 +192,7 @@ TEST(BatchWriteExecTests, MultiStaleOp) {
// Retry op in exec multiple times b/c of stale config
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
// Insert request
@@ -212,7 +217,7 @@ TEST(BatchWriteExecTests, MultiStaleOp) {
// Execute request
BatchedCommandResponse response;
- backend.exec->executeBatch(request, &response);
+ backend.exec->executeBatch(&txn, request, &response);
ASSERT(response.getOk());
const BatchWriteExecStats& stats = backend.exec->getStats();
@@ -226,6 +231,7 @@ TEST(BatchWriteExecTests, TooManyStaleOp) {
// We should report a no progress error for everything in the batch
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
// Insert request
@@ -252,7 +258,7 @@ TEST(BatchWriteExecTests, TooManyStaleOp) {
// Execute request
BatchedCommandResponse response;
- backend.exec->executeBatch(request, &response);
+ backend.exec->executeBatch(&txn, request, &response);
ASSERT(response.getOk());
ASSERT_EQUALS(response.getN(), 0);
ASSERT(response.isErrDetailsSet());
@@ -265,6 +271,7 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) {
// Retry op in exec many times b/c of stale config, but simulate remote migrations occurring
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
// Insert request
@@ -294,7 +301,7 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) {
// Execute request
BatchedCommandResponse response;
- backend.exec->executeBatch(request, &response);
+ backend.exec->executeBatch(&txn, request, &response);
ASSERT(response.getOk());
const BatchWriteExecStats& stats = backend.exec->getStats();
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 847b96ff9ee..57ea59c0b93 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -226,7 +226,8 @@ static void cancelBatches(const WriteErrorDetail& why,
batchMap->clear();
}
-Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
+Status BatchWriteOp::targetBatch(OperationContext* txn,
+ const NSTargeter& targeter,
bool recordTargetErrors,
vector<TargetedWriteBatch*>* targetedBatches) {
//
@@ -285,7 +286,7 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
OwnedPointerVector<TargetedWrite> writesOwned;
vector<TargetedWrite*>& writes = writesOwned.mutableVector();
- Status targetStatus = writeOp.targetWrites(targeter, &writes);
+ Status targetStatus = writeOp.targetWrites(txn, targeter, &writes);
if (!targetStatus.isOK()) {
WriteErrorDetail targetError;
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index 0add9339268..2d50bd39ed5 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -44,6 +44,7 @@
namespace mongo {
+class OperationContext;
class TargetedWriteBatch;
struct ShardError;
struct ShardWCError;
@@ -105,7 +106,8 @@ public:
*
* Returned TargetedWriteBatches are owned by the caller.
*/
- Status targetBatch(const NSTargeter& targeter,
+ Status targetBatch(OperationContext* txn,
+ const NSTargeter& targeter,
bool recordTargetErrors,
std::vector<TargetedWriteBatch*>* targetedBatches);
diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp
index da5f4e1775f..68dfc4deeb7 100644
--- a/src/mongo/s/write_ops/batch_write_op_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_op_test.cpp
@@ -29,6 +29,7 @@
#include "mongo/s/write_ops/batch_write_op.h"
#include "mongo/base/owned_pointer_vector.h"
+#include "mongo/db/operation_context_noop.h"
#include "mongo/s/mock_ns_targeter.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_delete_document.h"
@@ -135,6 +136,7 @@ TEST(WriteOpTests, SingleOp) {
// Single-op targeting test
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -151,7 +153,7 @@ TEST(WriteOpTests, SingleOp) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -174,6 +176,7 @@ TEST(WriteOpTests, SingleError) {
// Single-op error test
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -190,7 +193,7 @@ TEST(WriteOpTests, SingleError) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -219,6 +222,7 @@ TEST(WriteOpTests, SingleTargetError) {
// Single-op targeting error test
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -235,14 +239,14 @@ TEST(WriteOpTests, SingleTargetError) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(!status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 0u);
// Record targeting failures
- status = batchOp.targetBatch(targeter, true, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, true, &targeted);
ASSERT(status.isOK());
ASSERT(batchOp.isFinished());
@@ -261,6 +265,7 @@ TEST(WriteOpTests, SingleWriteConcernErrorOrdered) {
// write concern error if one occurs
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -277,7 +282,7 @@ TEST(WriteOpTests, SingleWriteConcernErrorOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -310,6 +315,7 @@ TEST(WriteOpTests, SingleStaleError) {
// We should retry the same batch until we're not stale
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -324,7 +330,7 @@ TEST(WriteOpTests, SingleStaleError) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
BatchedCommandResponse response;
buildResponse(0, &response);
@@ -335,14 +341,14 @@ TEST(WriteOpTests, SingleStaleError) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
// Respond again with a stale response
batchOp.noteBatchResponse(*targeted.front(), response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
buildResponse(1, &response);
@@ -376,6 +382,7 @@ TEST(WriteOpTests, MultiOpSameShardOrdered) {
// Multi-op targeting test (ordered)
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -394,7 +401,7 @@ TEST(WriteOpTests, MultiOpSameShardOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -419,6 +426,7 @@ TEST(WriteOpTests, MultiOpSameShardUnordered) {
// Multi-op targeting test (unordered)
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -437,7 +445,7 @@ TEST(WriteOpTests, MultiOpSameShardUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -463,6 +471,7 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) {
// There should be two sets of single batches (one to each shard, one-by-one)
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -482,7 +491,7 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -498,7 +507,7 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
@@ -521,6 +530,7 @@ TEST(WriteOpTests, MultiOpTwoShardsUnordered) {
// There should be one set of two batches (one to each shard)
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -540,7 +550,7 @@ TEST(WriteOpTests, MultiOpTwoShardsUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -572,6 +582,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) {
// There should be two sets of two batches to each shard (two for each delete op)
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -593,7 +604,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -614,7 +625,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
@@ -642,6 +653,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachUnordered) {
// There should be one set of two batches to each shard (containing writes for both ops)
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -663,7 +675,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -697,6 +709,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) {
// last ops should be batched together
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -724,7 +737,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -741,7 +754,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -762,7 +775,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -780,7 +793,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -808,6 +821,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsUnordered) {
// Should batch all the ops together into two batches of four ops for each shard
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -835,7 +849,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -869,6 +883,7 @@ TEST(WriteOpTests, MultiOpSingleShardErrorUnordered) {
// There should be one set of two batches to each shard and an error reported
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -887,7 +902,7 @@ TEST(WriteOpTests, MultiOpSingleShardErrorUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -932,6 +947,7 @@ TEST(WriteOpTests, MultiOpTwoShardErrorsUnordered) {
// There should be one set of two batches to each shard and and two errors reported
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -950,7 +966,7 @@ TEST(WriteOpTests, MultiOpTwoShardErrorsUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -998,6 +1014,7 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorUnordered) {
// There should be one set of two batches to each shard and an error reported
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -1018,7 +1035,7 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -1064,6 +1081,7 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorOrdered) {
// op should not get run
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -1084,7 +1102,7 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
@@ -1134,6 +1152,7 @@ TEST(WriteOpTests, MultiOpErrorAndWriteConcernErrorUnordered) {
// Don't suppress the error if ordered : false
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -1151,7 +1170,7 @@ TEST(WriteOpTests, MultiOpErrorAndWriteConcernErrorUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
@@ -1178,6 +1197,7 @@ TEST(WriteOpTests, SingleOpErrorAndWriteConcernErrorOrdered) {
// Suppress the write concern error if ordered and we also have an error
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -1196,7 +1216,7 @@ TEST(WriteOpTests, SingleOpErrorAndWriteConcernErrorOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
@@ -1228,6 +1248,7 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) {
// Targeting failure on second op in batch op (ordered)
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -1246,14 +1267,14 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
// First targeting round fails since we may be stale
ASSERT(!status.isOK());
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, true, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, true, &targeted);
// Second targeting round is ok, but should stop at first write
ASSERT(status.isOK());
@@ -1269,7 +1290,7 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, true, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, true, &targeted);
// Second targeting round results in an error which finishes the batch
ASSERT(status.isOK());
@@ -1290,6 +1311,7 @@ TEST(WriteOpTests, MultiOpFailedTargetUnordered) {
// Targeting failure on second op in batch op (unordered)
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -1309,14 +1331,14 @@ TEST(WriteOpTests, MultiOpFailedTargetUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
// First targeting round fails since we may be stale
ASSERT(!status.isOK());
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, true, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, true, &targeted);
// Second targeting round is ok, and should record an error
ASSERT(status.isOK());
@@ -1346,6 +1368,7 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) {
// Expect this gets translated down into write errors for first affected write
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -1363,7 +1386,7 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
@@ -1373,7 +1396,7 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, true, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, true, &targeted);
buildErrResponse(ErrorCodes::UnknownError, "mock error", &response);
@@ -1398,6 +1421,7 @@ TEST(WriteOpTests, MultiOpFailedBatchUnordered) {
// Expect this gets translated down into write errors for all affected writes
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -1416,7 +1440,7 @@ TEST(WriteOpTests, MultiOpFailedBatchUnordered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
@@ -1450,6 +1474,7 @@ TEST(WriteOpTests, MultiOpAbortOrdered) {
// Expect this gets translated down into write error for first affected write
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -1467,7 +1492,7 @@ TEST(WriteOpTests, MultiOpAbortOrdered) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
@@ -1499,6 +1524,7 @@ TEST(WriteOpTests, MultiOpAbortUnordered) {
// Expect this gets translated down into write errors for all affected writes
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -1539,6 +1565,7 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) {
// error
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED());
ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED());
@@ -1556,7 +1583,7 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
@@ -1567,7 +1594,7 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, true, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, true, &targeted);
// Second shard write write concern fails.
batchOp.noteBatchResponse(*targeted.front(), response, NULL);
@@ -1590,6 +1617,7 @@ TEST(WriteOpLimitTests, OneBigDoc) {
// Big single operation test - should go through
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -1608,7 +1636,7 @@ TEST(WriteOpLimitTests, OneBigDoc) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
@@ -1624,6 +1652,7 @@ TEST(WriteOpLimitTests, OneBigOneSmall) {
// Big doc with smaller additional doc - should go through as two batches
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -1644,7 +1673,7 @@ TEST(WriteOpLimitTests, OneBigOneSmall) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
@@ -1656,7 +1685,7 @@ TEST(WriteOpLimitTests, OneBigOneSmall) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
@@ -1670,6 +1699,7 @@ TEST(WriteOpLimitTests, TooManyOps) {
// Batch of 1002 documents
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -1688,7 +1718,7 @@ TEST(WriteOpLimitTests, TooManyOps) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
ASSERT_EQUALS(targeted.front()->getWrites().size(), 1000u);
@@ -1700,7 +1730,7 @@ TEST(WriteOpLimitTests, TooManyOps) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
@@ -1715,6 +1745,7 @@ TEST(WriteOpLimitTests, UpdateOverheadIncluded) {
// calculation
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
MockNSTargeter targeter;
@@ -1750,7 +1781,7 @@ TEST(WriteOpLimitTests, UpdateOverheadIncluded) {
OwnedPointerVector<TargetedWriteBatch> targetedOwned;
vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
- Status status = batchOp.targetBatch(targeter, false, &targeted);
+ Status status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u);
@@ -1766,7 +1797,7 @@ TEST(WriteOpLimitTests, UpdateOverheadIncluded) {
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
- status = batchOp.targetBatch(targeter, false, &targeted);
+ status = batchOp.targetBatch(&txn, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u);
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index f630802121d..24dd177cbda 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -63,7 +63,8 @@ const WriteErrorDetail& WriteOp::getOpError() const {
return *_error;
}
-Status WriteOp::targetWrites(const NSTargeter& targeter,
+Status WriteOp::targetWrites(OperationContext* txn,
+ const NSTargeter& targeter,
std::vector<TargetedWrite*>* targetedWrites) {
bool isUpdate = _itemRef.getOpType() == BatchedCommandRequest::BatchType_Update;
bool isDelete = _itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete;
@@ -74,16 +75,16 @@ Status WriteOp::targetWrites(const NSTargeter& targeter,
vector<ShardEndpoint*>& endpoints = endpointsOwned.mutableVector();
if (isUpdate) {
- targetStatus = targeter.targetUpdate(*_itemRef.getUpdate(), &endpoints);
+ targetStatus = targeter.targetUpdate(txn, *_itemRef.getUpdate(), &endpoints);
} else if (isDelete) {
- targetStatus = targeter.targetDelete(*_itemRef.getDelete(), &endpoints);
+ targetStatus = targeter.targetDelete(txn, *_itemRef.getDelete(), &endpoints);
} else {
dassert(_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert);
ShardEndpoint* endpoint = NULL;
// TODO: Remove the index targeting stuff once there is a command for it
if (!isIndexInsert) {
- targetStatus = targeter.targetInsert(_itemRef.getDocument(), &endpoint);
+ targetStatus = targeter.targetInsert(txn, _itemRef.getDocument(), &endpoint);
} else {
// TODO: Retry index writes with stale version?
targetStatus = targeter.targetCollection(&endpoints);
diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h
index fb189edfffb..bd50896b04a 100644
--- a/src/mongo/s/write_ops/write_op.h
+++ b/src/mongo/s/write_ops/write_op.h
@@ -122,7 +122,9 @@ public:
* Returns !OK if the targeting process itself fails
* (no TargetedWrites will be added, state unchanged)
*/
- Status targetWrites(const NSTargeter& targeter, std::vector<TargetedWrite*>* targetedWrites);
+ Status targetWrites(OperationContext* txn,
+ const NSTargeter& targeter,
+ std::vector<TargetedWrite*>* targetedWrites);
/**
* Returns the number of child writes that were last targeted.
diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp
index dd8b116c7ed..a180fc8abdc 100644
--- a/src/mongo/s/write_ops/write_op_test.cpp
+++ b/src/mongo/s/write_ops/write_op_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/base/error_codes.h"
#include "mongo/base/owned_pointer_vector.h"
+#include "mongo/db/operation_context_noop.h"
#include "mongo/s/mock_ns_targeter.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_delete_document.h"
@@ -82,6 +83,7 @@ TEST(WriteOpTests, TargetSingle) {
// Basic targeting test
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
@@ -103,7 +105,7 @@ TEST(WriteOpTests, TargetSingle) {
OwnedPointerVector<TargetedWrite> targetedOwned;
vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(targeter, &targeted);
+ Status status = writeOp.targetWrites(&txn, targeter, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
@@ -139,6 +141,7 @@ TEST(WriteOpTests, TargetMultiOneShard) {
// Multi-write targeting test where our query goes to one shard
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion(10, 0, OID()));
@@ -164,7 +167,7 @@ TEST(WriteOpTests, TargetMultiOneShard) {
OwnedPointerVector<TargetedWrite> targetedOwned;
vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(targeter, &targeted);
+ Status status = writeOp.targetWrites(&txn, targeter, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
@@ -181,6 +184,7 @@ TEST(WriteOpTests, TargetMultiAllShards) {
// Multi-write targeting test where our write goes to more than one shard
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpointA("shardA", ChunkVersion(10, 0, OID()));
@@ -207,7 +211,7 @@ TEST(WriteOpTests, TargetMultiAllShards) {
OwnedPointerVector<TargetedWrite> targetedOwned;
vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(targeter, &targeted);
+ Status status = writeOp.targetWrites(&txn, targeter, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
@@ -232,6 +236,7 @@ TEST(WriteOpTests, ErrorSingle) {
// Single error after targeting test
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
@@ -253,7 +258,7 @@ TEST(WriteOpTests, ErrorSingle) {
OwnedPointerVector<TargetedWrite> targetedOwned;
vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(targeter, &targeted);
+ Status status = writeOp.targetWrites(&txn, targeter, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
@@ -277,6 +282,7 @@ TEST(WriteOpTests, CancelSingle) {
// Cancel single targeting test
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
@@ -298,7 +304,7 @@ TEST(WriteOpTests, CancelSingle) {
OwnedPointerVector<TargetedWrite> targetedOwned;
vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(targeter, &targeted);
+ Status status = writeOp.targetWrites(&txn, targeter, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
@@ -319,6 +325,7 @@ TEST(WriteOpTests, RetrySingleOp) {
// Retry single targeting test
//
+ OperationContextNoop txn;
NamespaceString nss("foo.bar");
ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
@@ -340,7 +347,7 @@ TEST(WriteOpTests, RetrySingleOp) {
OwnedPointerVector<TargetedWrite> targetedOwned;
vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(targeter, &targeted);
+ Status status = writeOp.targetWrites(&txn, targeter, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);