diff options
Diffstat (limited to 'src/mongo')
100 files changed, 918 insertions, 676 deletions
diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp index 4dc63472b90..b03dae3daeb 100644 --- a/src/mongo/client/parallel.cpp +++ b/src/mongo/client/parallel.cpp @@ -60,13 +60,13 @@ using std::vector; LabeledLevel pc("pcursor", 2); -void ParallelSortClusteredCursor::init() { +void ParallelSortClusteredCursor::init(OperationContext* txn) { if (_didInit) return; _didInit = true; if (!_qSpec.isEmpty()) { - fullInit(); + fullInit(txn); } else { // You can only get here by using the legacy constructor // TODO: Eliminate this @@ -494,9 +494,9 @@ string ParallelSortClusteredCursor::toString() const { return str::stream() << "PCursor : " << toBSON(); } -void ParallelSortClusteredCursor::fullInit() { - startInit(); - finishInit(); +void ParallelSortClusteredCursor::fullInit(OperationContext* txn) { + startInit(txn); + finishInit(txn); } void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS, @@ -520,10 +520,11 @@ void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS, forceReload = tries > 2; } -void ParallelSortClusteredCursor::_handleStaleNS(const NamespaceString& staleNS, +void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* txn, + const NamespaceString& staleNS, bool forceReload, bool fullReload) { - auto status = grid.catalogCache()->getDatabase(staleNS.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, staleNS.db().toString()); if (!status.isOK()) { warning() << "cannot reload database info for stale namespace " << staleNS.ns(); return; @@ -532,7 +533,7 @@ void ParallelSortClusteredCursor::_handleStaleNS(const NamespaceString& staleNS, shared_ptr<DBConfig> config = status.getValue(); // Reload db if needed, make sure it works - if (fullReload && !config->reload()) { + if (fullReload && !config->reload(txn)) { // We didn't find the db after reload, the db may have been dropped, reset this ptr config.reset(); } @@ -541,7 +542,7 @@ void ParallelSortClusteredCursor::_handleStaleNS(const NamespaceString& staleNS, warning() << "cannot reload database info for stale namespace " << staleNS.ns(); } else { // Reload chunk manager, potentially forcing the namespace - config->getChunkManagerIfExists(staleNS.ns(), true, forceReload); + config->getChunkManagerIfExists(txn, staleNS.ns(), true, forceReload); } } @@ -626,7 +627,7 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(PCStatePtr state, } } -void ParallelSortClusteredCursor::startInit() { +void ParallelSortClusteredCursor::startInit(OperationContext* txn) { const bool returnPartial = (_qSpec.options() & QueryOption_PartialResults); const NamespaceString nss(!_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns()); @@ -649,7 +650,7 @@ void ParallelSortClusteredCursor::startInit() { { shared_ptr<DBConfig> config; - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (status.getStatus().code() != ErrorCodes::DatabaseNotFound) { config = uassertStatusOK(status); config->getChunkManagerOrPrimary(nss.ns(), manager, primary); @@ -850,10 +851,10 @@ void ParallelSortClusteredCursor::startInit() { warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace " << staleNS; - _handleStaleNS(staleNS, forceReload, fullReload); + _handleStaleNS(txn, staleNS, forceReload, fullReload); // Restart with new chunk manager - startInit(); + startInit(txn); return; } catch (SocketException& e) { warning() << "socket exception when initializing on " << shardId @@ -924,7 +925,7 @@ void ParallelSortClusteredCursor::startInit() { } } -void ParallelSortClusteredCursor::finishInit() { +void ParallelSortClusteredCursor::finishInit(OperationContext* txn) { bool returnPartial = (_qSpec.options() & QueryOption_PartialResults); bool specialVersion = _cInfo.versionedNS.size() > 0; string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns(); @@ -1070,13 +1071,13 @@ void ParallelSortClusteredCursor::finishInit() { warning() << "versioned ns " << ns << " doesn't match stale config namespace " << staleNS; - _handleStaleNS(staleNS, forceReload, fullReload); + _handleStaleNS(txn, staleNS, forceReload, fullReload); } } // Re-establish connections we need to - startInit(); - finishInit(); + startInit(txn); + finishInit(txn); return; } diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h index 92132e182a1..ca6faa8644d 100644 --- a/src/mongo/client/parallel.h +++ b/src/mongo/client/parallel.h @@ -39,6 +39,7 @@ namespace mongo { class DBClientCursorHolder; +class OperationContext; class StaleConfigException; class ParallelConnectionMetadata; @@ -150,7 +151,7 @@ public: std::string getNS(); /** call before using */ - void init(); + void init(OperationContext* txn); bool more(); BSONObj next(); @@ -158,9 +159,9 @@ public: return "ParallelSort"; } - void fullInit(); - void startInit(); - void finishInit(); + void fullInit(OperationContext* txn); + void startInit(OperationContext* txn); + void finishInit(OperationContext* txn); bool isCommand() { return NamespaceString(_qSpec.ns()).isCommand(); @@ -217,7 +218,10 @@ private: const StaleConfigException& e, bool& forceReload, bool& fullReload); - void _handleStaleNS(const NamespaceString& staleNS, bool forceReload, bool fullReload); + void _handleStaleNS(OperationContext* txn, + const NamespaceString& staleNS, + bool forceReload, + bool fullReload); bool _didInit; bool _done; diff --git a/src/mongo/db/auth/authorization_manager.cpp b/src/mongo/db/auth/authorization_manager.cpp index 4430c43a5ab..37136f69114 100644 --- a/src/mongo/db/auth/authorization_manager.cpp +++ b/src/mongo/db/auth/authorization_manager.cpp @@ -413,18 +413,20 @@ Status AuthorizationManager::getUserDescription(OperationContext* txn, return _externalState->getUserDescription(txn, userName, result); } -Status AuthorizationManager::getRoleDescription(const RoleName& roleName, +Status AuthorizationManager::getRoleDescription(OperationContext* txn, + const RoleName& roleName, bool showPrivileges, BSONObj* result) { - return _externalState->getRoleDescription(roleName, showPrivileges, result); + return _externalState->getRoleDescription(txn, roleName, showPrivileges, result); } -Status AuthorizationManager::getRoleDescriptionsForDB(const std::string dbname, +Status AuthorizationManager::getRoleDescriptionsForDB(OperationContext* txn, + const std::string dbname, bool showPrivileges, bool showBuiltinRoles, vector<BSONObj>* result) { return _externalState->getRoleDescriptionsForDB( - dbname, showPrivileges, showBuiltinRoles, result); + txn, dbname, showPrivileges, showBuiltinRoles, result); } Status AuthorizationManager::acquireUser(OperationContext* txn, diff --git a/src/mongo/db/auth/authorization_manager.h b/src/mongo/db/auth/authorization_manager.h index 9c7fdbaf9d0..995207723c8 100644 --- a/src/mongo/db/auth/authorization_manager.h +++ b/src/mongo/db/auth/authorization_manager.h @@ -232,7 +232,10 @@ public: * * If the role does not exist, returns ErrorCodes::RoleNotFound. */ - Status getRoleDescription(const RoleName& roleName, bool showPrivileges, BSONObj* result); + Status getRoleDescription(OperationContext* txn, + const RoleName& roleName, + bool showPrivileges, + BSONObj* result); /** * Writes into "result" documents describing the roles that are defined on the given @@ -247,7 +250,8 @@ public: * the document will contain a "warnings" array, with std::string messages describing * inconsistencies. */ - Status getRoleDescriptionsForDB(const std::string dbname, + Status getRoleDescriptionsForDB(OperationContext* txn, + const std::string dbname, bool showPrivileges, bool showBuiltinRoles, std::vector<BSONObj>* result); diff --git a/src/mongo/db/auth/authz_manager_external_state.h b/src/mongo/db/auth/authz_manager_external_state.h index e3bdcdb8c43..0efac6ec80b 100644 --- a/src/mongo/db/auth/authz_manager_external_state.h +++ b/src/mongo/db/auth/authz_manager_external_state.h @@ -103,7 +103,8 @@ public: * * If the role does not exist, returns ErrorCodes::RoleNotFound. */ - virtual Status getRoleDescription(const RoleName& roleName, + virtual Status getRoleDescription(OperationContext* txn, + const RoleName& roleName, bool showPrivileges, BSONObj* result) = 0; @@ -120,7 +121,8 @@ public: * the document will contain a "warnings" array, with std::string messages describing * inconsistencies. */ - virtual Status getRoleDescriptionsForDB(const std::string dbname, + virtual Status getRoleDescriptionsForDB(OperationContext* txn, + const std::string dbname, bool showPrivileges, bool showBuiltinRoles, std::vector<BSONObj>* result) = 0; diff --git a/src/mongo/db/auth/authz_manager_external_state_local.cpp b/src/mongo/db/auth/authz_manager_external_state_local.cpp index 7f410b605af..449c1e9dbfe 100644 --- a/src/mongo/db/auth/authz_manager_external_state_local.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_local.cpp @@ -227,7 +227,8 @@ Status AuthzManagerExternalStateLocal::_getUserDocument(OperationContext* txn, return status; } -Status AuthzManagerExternalStateLocal::getRoleDescription(const RoleName& roleName, +Status AuthzManagerExternalStateLocal::getRoleDescription(OperationContext* txn, + const RoleName& roleName, bool showPrivileges, BSONObj* result) { stdx::lock_guard<stdx::mutex> lk(_roleGraphMutex); @@ -286,7 +287,8 @@ Status AuthzManagerExternalStateLocal::_getRoleDescription_inlock(const RoleName return Status::OK(); } -Status AuthzManagerExternalStateLocal::getRoleDescriptionsForDB(const std::string dbname, +Status AuthzManagerExternalStateLocal::getRoleDescriptionsForDB(OperationContext* txn, + const std::string dbname, bool showPrivileges, bool showBuiltinRoles, vector<BSONObj>* result) { diff --git a/src/mongo/db/auth/authz_manager_external_state_local.h b/src/mongo/db/auth/authz_manager_external_state_local.h index f761279d03a..536404dcd51 100644 --- a/src/mongo/db/auth/authz_manager_external_state_local.h +++ b/src/mongo/db/auth/authz_manager_external_state_local.h @@ -67,10 +67,12 @@ public: virtual Status getUserDescription(OperationContext* txn, const UserName& userName, BSONObj* result); - virtual Status getRoleDescription(const RoleName& roleName, + virtual Status getRoleDescription(OperationContext* txn, + const RoleName& roleName, bool showPrivileges, BSONObj* result); - virtual Status getRoleDescriptionsForDB(const std::string dbname, + virtual Status getRoleDescriptionsForDB(OperationContext* txn, + const std::string dbname, bool showPrivileges, bool showBuiltinRoles, std::vector<BSONObj>* result); diff --git a/src/mongo/db/auth/authz_manager_external_state_s.cpp b/src/mongo/db/auth/authz_manager_external_state_s.cpp index b71cf0cf08c..ecb2d014c4f 100644 --- a/src/mongo/db/auth/authz_manager_external_state_s.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_s.cpp @@ -70,7 +70,7 @@ Status AuthzManagerExternalStateMongos::getStoredAuthorizationVersion(OperationC BSONObj getParameterCmd = BSON("getParameter" << 1 << authSchemaVersionServerParameter << 1); BSONObjBuilder builder; const bool ok = - grid.catalogManager()->runUserManagementReadCommand("admin", getParameterCmd, &builder); + grid.catalogManager(txn)->runUserManagementReadCommand("admin", getParameterCmd, &builder); BSONObj cmdResult = builder.obj(); if (!ok) { return Command::getStatusFromCommandResult(cmdResult); @@ -96,7 +96,7 @@ Status AuthzManagerExternalStateMongos::getUserDescription(OperationContext* txn << "showCredentials" << true); BSONObjBuilder builder; const bool ok = - grid.catalogManager()->runUserManagementReadCommand("admin", usersInfoCmd, &builder); + grid.catalogManager(txn)->runUserManagementReadCommand("admin", usersInfoCmd, &builder); BSONObj cmdResult = builder.obj(); if (!ok) { return Command::getStatusFromCommandResult(cmdResult); @@ -116,7 +116,8 @@ Status AuthzManagerExternalStateMongos::getUserDescription(OperationContext* txn return Status::OK(); } -Status AuthzManagerExternalStateMongos::getRoleDescription(const RoleName& roleName, +Status AuthzManagerExternalStateMongos::getRoleDescription(OperationContext* txn, + const RoleName& roleName, bool showPrivileges, BSONObj* result) { BSONObj rolesInfoCmd = @@ -126,7 +127,7 @@ Status AuthzManagerExternalStateMongos::getRoleDescription(const RoleName& roleN << roleName.getDB())) << "showPrivileges" << showPrivileges); BSONObjBuilder builder; const bool ok = - grid.catalogManager()->runUserManagementReadCommand("admin", rolesInfoCmd, &builder); + grid.catalogManager(txn)->runUserManagementReadCommand("admin", rolesInfoCmd, &builder); BSONObj cmdResult = builder.obj(); if (!ok) { return Command::getStatusFromCommandResult(cmdResult); @@ -146,7 +147,8 @@ Status AuthzManagerExternalStateMongos::getRoleDescription(const RoleName& roleN return Status::OK(); } -Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(const std::string dbname, +Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(OperationContext* txn, + const std::string dbname, bool showPrivileges, bool showBuiltinRoles, std::vector<BSONObj>* result) { @@ -154,7 +156,7 @@ Status AuthzManagerExternalStateMongos::getRoleDescriptionsForDB(const std::stri << "showBuiltinRoles" << showBuiltinRoles); BSONObjBuilder builder; const bool ok = - grid.catalogManager()->runUserManagementReadCommand(dbname, rolesInfoCmd, &builder); + grid.catalogManager(txn)->runUserManagementReadCommand(dbname, rolesInfoCmd, &builder); BSONObj cmdResult = builder.obj(); if (!ok) { return Command::getStatusFromCommandResult(cmdResult); @@ -169,7 +171,7 @@ bool AuthzManagerExternalStateMongos::hasAnyPrivilegeDocuments(OperationContext* BSONObj usersInfoCmd = BSON("usersInfo" << 1); BSONObjBuilder builder; const bool ok = - grid.catalogManager()->runUserManagementReadCommand("admin", usersInfoCmd, &builder); + grid.catalogManager(txn)->runUserManagementReadCommand("admin", usersInfoCmd, &builder); if (!ok) { // If we were unable to complete the query, // it's best to assume that there _are_ privilege documents. This might happen diff --git a/src/mongo/db/auth/authz_manager_external_state_s.h b/src/mongo/db/auth/authz_manager_external_state_s.h index 8de98a53552..bf9ca876c43 100644 --- a/src/mongo/db/auth/authz_manager_external_state_s.h +++ b/src/mongo/db/auth/authz_manager_external_state_s.h @@ -57,10 +57,12 @@ public: virtual Status getUserDescription(OperationContext* txn, const UserName& userName, BSONObj* result); - virtual Status getRoleDescription(const RoleName& roleName, + virtual Status getRoleDescription(OperationContext* txn, + const RoleName& roleName, bool showPrivileges, BSONObj* result); - virtual Status getRoleDescriptionsForDB(const std::string dbname, + virtual Status getRoleDescriptionsForDB(OperationContext* txn, + const std::string dbname, bool showPrivileges, bool showBuiltinRoles, std::vector<BSONObj>* result); diff --git a/src/mongo/db/auth/user_cache_invalidator_job.cpp b/src/mongo/db/auth/user_cache_invalidator_job.cpp index dd44b200f60..8ad27ca4484 100644 --- a/src/mongo/db/auth/user_cache_invalidator_job.cpp +++ b/src/mongo/db/auth/user_cache_invalidator_job.cpp @@ -88,10 +88,10 @@ public: } exportedIntervalParam; -StatusWith<OID> getCurrentCacheGeneration() { +StatusWith<OID> getCurrentCacheGeneration(OperationContext* txn) { try { BSONObjBuilder result; - const bool ok = grid.catalogManager()->runUserManagementReadCommand( + const bool ok = grid.catalogManager(txn)->runUserManagementReadCommand( "admin", BSON("_getUserCacheGeneration" << 1), &result); if (!ok) { return Command::getStatusFromCommandResult(result.obj()); @@ -107,8 +107,10 @@ StatusWith<OID> getCurrentCacheGeneration() { } // namespace UserCacheInvalidator::UserCacheInvalidator(AuthorizationManager* authzManager) - : _authzManager(authzManager) { - StatusWith<OID> currentGeneration = getCurrentCacheGeneration(); + : _authzManager(authzManager) {} + +void UserCacheInvalidator::initialize(OperationContext* txn) { + StatusWith<OID> currentGeneration = getCurrentCacheGeneration(txn); if (currentGeneration.isOK()) { _previousCacheGeneration = currentGeneration.getValue(); return; @@ -145,7 +147,8 @@ void UserCacheInvalidator::run() { break; } - StatusWith<OID> currentGeneration = getCurrentCacheGeneration(); + auto txn = cc().makeOperationContext(); + StatusWith<OID> currentGeneration = getCurrentCacheGeneration(txn.get()); if (!currentGeneration.isOK()) { if (currentGeneration.getStatus().code() == ErrorCodes::CommandNotFound) { warning() << "_getUserCacheGeneration command not found on config server(s), " diff --git a/src/mongo/db/auth/user_cache_invalidator_job.h b/src/mongo/db/auth/user_cache_invalidator_job.h index 3eb173b0a56..3b1c2544939 100644 --- a/src/mongo/db/auth/user_cache_invalidator_job.h +++ b/src/mongo/db/auth/user_cache_invalidator_job.h @@ -33,6 +33,7 @@ namespace mongo { class AuthorizationManager; +class OperationContext; /** * Background job that runs only in mongos and periodically checks in with the config servers @@ -42,7 +43,9 @@ class AuthorizationManager; */ class UserCacheInvalidator : public BackgroundJob { public: - explicit UserCacheInvalidator(AuthorizationManager* authzManager); + UserCacheInvalidator(AuthorizationManager* authzManager); + + void initialize(OperationContext* txn); protected: virtual std::string name() const; diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 2f25a87719a..ea5ec9092ad 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -284,8 +284,8 @@ public: // Counter for unknown commands static Counter64 unknownCommands; - /** @return if command was found */ - static void runAgainstRegistered(const char* ns, + static void runAgainstRegistered(OperationContext* txn, + const char* ns, BSONObj& jsobj, BSONObjBuilder& anObjBuilder, int queryOptions = 0); diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 62b17b562f6..5529b5cc5a0 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1627,7 +1627,7 @@ public: result.append("result", config.outputOptions.collectionName); } - auto status = grid.catalogCache()->getDatabase(dbname); + auto status = grid.catalogCache()->getDatabase(txn, dbname); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } @@ -1636,7 +1636,7 @@ public: vector<ChunkPtr> chunks; if (confOut->isSharded(config.outputOptions.finalNamespace)) { - ChunkManagerPtr cm = confOut->getChunkManager(config.outputOptions.finalNamespace); + ChunkManagerPtr cm = confOut->getChunkManager(txn, config.outputOptions.finalNamespace); // Fetch result from other shards 1 chunk at a time. It would be better to do // just one big $or query, but then the sorting would not be efficient. @@ -1670,7 +1670,7 @@ public: BSONObj sortKey = BSON("_id" << 1); ParallelSortClusteredCursor cursor( servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout); - cursor.init(); + cursor.init(txn); int chunkSize = 0; while (cursor.more() || !values.empty()) { diff --git a/src/mongo/db/commands/user_management_commands.cpp b/src/mongo/db/commands/user_management_commands.cpp index 1645692daf4..b98ae7a5643 100644 --- a/src/mongo/db/commands/user_management_commands.cpp +++ b/src/mongo/db/commands/user_management_commands.cpp @@ -154,7 +154,8 @@ Status getCurrentUserRoles(OperationContext* txn, * same database as the role it is being added to (or that the role being added to is from the * "admin" database. */ -Status checkOkayToGrantRolesToRole(const RoleName& role, +Status checkOkayToGrantRolesToRole(OperationContext* txn, + const RoleName& role, const std::vector<RoleName> rolesToAdd, AuthorizationManager* authzManager) { for (std::vector<RoleName>::const_iterator it = rolesToAdd.begin(); it != rolesToAdd.end(); @@ -174,7 +175,7 @@ Status checkOkayToGrantRolesToRole(const RoleName& role, } BSONObj roleToAddDoc; - Status status = authzManager->getRoleDescription(roleToAdd, false, &roleToAddDoc); + Status status = authzManager->getRoleDescription(txn, roleToAdd, false, &roleToAddDoc); if (status == ErrorCodes::RoleNotFound) { return Status(ErrorCodes::RoleNotFound, "Cannot grant nonexistent role " + roleToAdd.toString()); @@ -725,7 +726,7 @@ public: // Role existence has to be checked after acquiring the update lock for (size_t i = 0; i < args.roles.size(); ++i) { BSONObj ignored; - status = authzManager->getRoleDescription(args.roles[i], false, &ignored); + status = authzManager->getRoleDescription(txn, args.roles[i], false, &ignored); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -842,7 +843,7 @@ public: if (args.hasRoles) { for (size_t i = 0; i < args.roles.size(); ++i) { BSONObj ignored; - status = authzManager->getRoleDescription(args.roles[i], false, &ignored); + status = authzManager->getRoleDescription(txn, args.roles[i], false, &ignored); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -1077,7 +1078,7 @@ public: for (vector<RoleName>::iterator it = roles.begin(); it != roles.end(); ++it) { RoleName& roleName = *it; BSONObj roleDoc; - status = authzManager->getRoleDescription(roleName, false, &roleDoc); + status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -1157,7 +1158,7 @@ public: for (vector<RoleName>::iterator it = roles.begin(); it != roles.end(); ++it) { RoleName& roleName = *it; BSONObj roleDoc; - status = authzManager->getRoleDescription(roleName, false, &roleDoc); + status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -1381,7 +1382,7 @@ public: } // Role existence has to be checked after acquiring the update lock - status = checkOkayToGrantRolesToRole(args.roleName, args.roles, authzManager); + status = checkOkayToGrantRolesToRole(txn, args.roleName, args.roles, authzManager); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -1471,13 +1472,13 @@ public: // Role existence has to be checked after acquiring the update lock BSONObj ignored; - status = authzManager->getRoleDescription(args.roleName, false, &ignored); + status = authzManager->getRoleDescription(txn, args.roleName, false, &ignored); if (!status.isOK()) { return appendCommandStatus(result, status); } if (args.hasRoles) { - status = checkOkayToGrantRolesToRole(args.roleName, args.roles, authzManager); + status = checkOkayToGrantRolesToRole(txn, args.roleName, args.roles, authzManager); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -1568,7 +1569,7 @@ public: } BSONObj roleDoc; - status = authzManager->getRoleDescription(roleName, true, &roleDoc); + status = authzManager->getRoleDescription(txn, roleName, true, &roleDoc); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -1680,7 +1681,7 @@ public: } BSONObj roleDoc; - status = authzManager->getRoleDescription(roleName, true, &roleDoc); + status = authzManager->getRoleDescription(txn, roleName, true, &roleDoc); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -1798,13 +1799,13 @@ public: // Role existence has to be checked after acquiring the update lock BSONObj roleDoc; - status = authzManager->getRoleDescription(roleName, false, &roleDoc); + status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc); if (!status.isOK()) { return appendCommandStatus(result, status); } // Check for cycles - status = checkOkayToGrantRolesToRole(roleName, rolesToAdd, authzManager); + status = checkOkayToGrantRolesToRole(txn, roleName, rolesToAdd, authzManager); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -1897,7 +1898,7 @@ public: } BSONObj roleDoc; - status = authzManager->getRoleDescription(roleName, false, &roleDoc); + status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -1991,7 +1992,7 @@ public: } BSONObj roleDoc; - status = authzManager->getRoleDescription(roleName, false, &roleDoc); + status = authzManager->getRoleDescription(txn, roleName, false, &roleDoc); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -2262,7 +2263,7 @@ public: if (args.allForDB) { std::vector<BSONObj> rolesDocs; status = getGlobalAuthorizationManager()->getRoleDescriptionsForDB( - dbname, args.showPrivileges, args.showBuiltinRoles, &rolesDocs); + txn, dbname, args.showPrivileges, args.showBuiltinRoles, &rolesDocs); if (!status.isOK()) { return appendCommandStatus(result, status); } @@ -2274,7 +2275,7 @@ public: for (size_t i = 0; i < args.roleNames.size(); ++i) { BSONObj roleDetails; status = getGlobalAuthorizationManager()->getRoleDescription( - args.roleNames[i], args.showPrivileges, &roleDetails); + txn, args.roleNames[i], args.showPrivileges, &roleDetails); if (status.code() == ErrorCodes::RoleNotFound) { continue; } diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 93638a041dd..f103b0b1bae 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -1159,7 +1159,16 @@ void exitCleanly(ExitCode code) { getGlobalServiceContext()->setKillAllOperations(); repl::getGlobalReplicationCoordinator()->shutdown(); - auto catalogMgr = grid.catalogManager(); + + Client& client = cc(); + ServiceContext::UniqueOperationContext uniqueTxn; + OperationContext* txn = client.getOperationContext(); + if (!txn) { + uniqueTxn = client.makeOperationContext(); + txn = uniqueTxn.get(); + } + + auto catalogMgr = grid.catalogManager(txn); if (catalogMgr) { catalogMgr->shutDown(); } diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h index 172ba5f5d5d..3d380605d58 100644 --- a/src/mongo/db/operation_context_noop.h +++ b/src/mongo/db/operation_context_noop.h @@ -54,9 +54,20 @@ public: OperationContextNoop(Client* client, unsigned int opId, Locker* locker, RecoveryUnit* ru) : OperationContext(client, opId, locker), _recoveryUnit(ru) { _locker.reset(lockState()); + + if (client) { + stdx::lock_guard<Client> lk(*client); + client->setOperationContext(this); + } } - virtual ~OperationContextNoop() = default; + virtual ~OperationContextNoop() { + auto client = getClient(); + if (client) { + stdx::lock_guard<Client> lk(*client); + client->resetOperationContext(); + } + } virtual RecoveryUnit* recoveryUnit() const override { return _recoveryUnit.get(); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 8236d936c21..9ff279cdc45 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -955,7 +955,7 @@ MoveTimingHelper::~MoveTimingHelper() { _b.append("errmsg", *_cmdErrmsg); } - grid.catalogManager()->logChange( + grid.catalogManager(_txn)->logChange( _txn->getClient()->clientAddress(true), (string) "moveChunk." + _where, _ns, _b.obj()); } catch (const std::exception& e) { warning() << "couldn't record timing for moveChunk '" << _where << "': " << e.what() diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index 9f99cdce5a5..6010fa8a513 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -324,15 +324,15 @@ private: } if (ShardingState::get(txn)->enabled()) { - if (configdb == ShardingState::get(txn)->getConfigServer()) + if (configdb == ShardingState::get(txn)->getConfigServer(txn)) return true; result.append("configdb", - BSON("stored" << ShardingState::get(txn)->getConfigServer() << "given" + BSON("stored" << ShardingState::get(txn)->getConfigServer(txn) << "given" << configdb)); errmsg = str::stream() << "mongos specified a different config database string : " - << "stored : " << ShardingState::get(txn)->getConfigServer() + << "stored : " << ShardingState::get(txn)->getConfigServer(txn) << " vs given : " << configdb; return false; } diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index cab256a66f6..4aee88c7307 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -84,11 +84,11 @@ bool ShardingState::enabled() { return _enabled; } -string ShardingState::getConfigServer() { +string ShardingState::getConfigServer(OperationContext* txn) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_enabled); - return grid.catalogManager()->connectionString().toString(); + return grid.catalogManager(txn)->connectionString().toString(); } string ShardingState::getShardName() { @@ -545,7 +545,7 @@ Status ShardingState::doRefreshMetadata(OperationContext* txn, shared_ptr<CollectionMetadata> remoteMetadata(remoteMetadataRaw); Timer refreshTimer; - Status status = mdLoader.makeCollectionMetadata(grid.catalogManager(), + Status status = mdLoader.makeCollectionMetadata(grid.catalogManager(txn), ns, getShardName(), fullReload ? NULL : beforeMetadata.get(), @@ -738,7 +738,7 @@ Status ShardingState::doRefreshMetadata(OperationContext* txn, return Status::OK(); } -void ShardingState::appendInfo(BSONObjBuilder& builder) { +void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) { stdx::lock_guard<stdx::mutex> lk(_mutex); builder.appendBool("enabled", _enabled); @@ -746,7 +746,7 @@ void ShardingState::appendInfo(BSONObjBuilder& builder) { return; } - builder.append("configServer", grid.catalogManager()->connectionString().toString()); + builder.append("configServer", grid.catalogManager(txn)->connectionString().toString()); builder.append("shardName", _shardName); BSONObjBuilder versionB(builder.subobjStart("versions")); diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 2a9d835027f..ff6248783f3 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -72,7 +72,7 @@ public: bool enabled(); - std::string getConfigServer(); + std::string getConfigServer(OperationContext* txn); std::string getShardName(); // Initialize sharding state and begin authenticating outgoing connections and handling @@ -149,7 +149,7 @@ public: const std::string& ns, ChunkVersion* latestShardVersion); - void appendInfo(BSONObjBuilder& b); + void appendInfo(OperationContext* txn, BSONObjBuilder& b); // querying support diff --git a/src/mongo/db/storage/record_store_test_harness.h b/src/mongo/db/storage/record_store_test_harness.h index 69d7640047f..cc613c3bd3b 100644 --- a/src/mongo/db/storage/record_store_test_harness.h +++ b/src/mongo/db/storage/record_store_test_harness.h @@ -57,7 +57,7 @@ public: } std::unique_ptr<OperationContext> newOperationContext() { - return newOperationContext(client()); + return newOperationContext(nullptr); } /** diff --git a/src/mongo/db/storage/record_store_test_touch.cpp b/src/mongo/db/storage/record_store_test_touch.cpp index c765bc2638b..14181ec0c9b 100644 --- a/src/mongo/db/storage/record_store_test_touch.cpp +++ b/src/mongo/db/storage/record_store_test_touch.cpp @@ -51,7 +51,8 @@ TEST(RecordStoreTestHarness, TouchEmpty) { } { - unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext()); + unique_ptr<OperationContext> opCtx( + harnessHelper->newOperationContext(harnessHelper->client())); { BSONObjBuilder stats; Status status = rs->touch(opCtx.get(), &stats); @@ -93,7 +94,8 @@ TEST(RecordStoreTestHarness, TouchNonEmpty) { } { - unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext()); + unique_ptr<OperationContext> opCtx( + harnessHelper->newOperationContext(harnessHelper->client())); { BSONObjBuilder stats; // XXX does not verify the collection was loaded into cache @@ -116,7 +118,8 @@ TEST(RecordStoreTestHarness, TouchEmptyWithNullStats) { } { - unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext()); + unique_ptr<OperationContext> opCtx( + harnessHelper->newOperationContext(harnessHelper->client())); Status status = rs->touch(opCtx.get(), NULL /* stats output */); ASSERT(status.isOK() || status.code() == ErrorCodes::CommandNotSupported); } @@ -155,7 +158,8 @@ TEST(RecordStoreTestHarness, TouchNonEmptyWithNullStats) { } { - unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext()); + unique_ptr<OperationContext> opCtx( + harnessHelper->newOperationContext(harnessHelper->client())); // XXX does not verify the collection was loaded into cache // (even if supported by storage engine) Status status = rs->touch(opCtx.get(), NULL /* stats output */); diff --git a/src/mongo/dbtests/chunk_manager_tests.cpp b/src/mongo/dbtests/chunk_manager_tests.cpp index 9d64c226df2..9a04ff1bb3a 100644 --- a/src/mongo/dbtests/chunk_manager_tests.cpp +++ b/src/mongo/dbtests/chunk_manager_tests.cpp @@ -111,7 +111,7 @@ protected: ShardKeyPattern shardKeyPattern(BSON(keyName << 1)); ChunkManager manager(_collName, shardKeyPattern, false); - manager.createFirstChunks(_shardId, &splitKeys, NULL); + manager.createFirstChunks(&_txn, _shardId, &splitKeys, NULL); } }; @@ -181,7 +181,7 @@ TEST_F(ChunkManagerTests, Basic) { collType.setDropped(false); ChunkManager manager(collType); - manager.loadExistingRanges(nullptr); + manager.loadExistingRanges(&_txn, nullptr); ASSERT(manager.getVersion().epoch() == version.epoch()); ASSERT(manager.getVersion().minorVersion() == (numChunks - 1)); @@ -196,7 +196,7 @@ TEST_F(ChunkManagerTests, Basic) { // Make new manager load chunk diff ChunkManager newManager(manager.getns(), manager.getShardKeyPattern(), manager.isUnique()); - newManager.loadExistingRanges(&manager); + newManager.loadExistingRanges(&_txn, &manager); ASSERT(newManager.getVersion().toLong() == laterVersion.toLong()); ASSERT(newManager.getVersion().epoch() == laterVersion.epoch()); diff --git a/src/mongo/dbtests/config_upgrade_tests.cpp b/src/mongo/dbtests/config_upgrade_tests.cpp index 7e1eb958c46..a108ebcd524 100644 --- a/src/mongo/dbtests/config_upgrade_tests.cpp +++ b/src/mongo/dbtests/config_upgrade_tests.cpp @@ -165,7 +165,7 @@ TEST_F(ConfigUpgradeTests, EmptyVersion) { // Zero version (no version doc) VersionType oldVersion; - Status status = getConfigVersion(grid.catalogManager(), &oldVersion); + Status status = getConfigVersion(grid.catalogManager(&_txn), &oldVersion); ASSERT(status.isOK()); ASSERT_EQUALS(oldVersion.getMinCompatibleVersion(), 0); @@ -185,7 +185,7 @@ TEST_F(ConfigUpgradeTests, ClusterIDVersion) { newVersion.clear(); // Current Version w/o clusterId (invalid!) - Status status = getConfigVersion(grid.catalogManager(), &newVersion); + Status status = getConfigVersion(grid.catalogManager(&_txn), &newVersion); ASSERT(!status.isOK()); newVersion.clear(); @@ -201,7 +201,7 @@ TEST_F(ConfigUpgradeTests, ClusterIDVersion) { newVersion.clear(); // Current version w/ clusterId (valid!) - status = getConfigVersion(grid.catalogManager(), &newVersion); + status = getConfigVersion(grid.catalogManager(&_txn), &newVersion); ASSERT(status.isOK()); ASSERT_EQUALS(newVersion.getMinCompatibleVersion(), MIN_COMPATIBLE_CONFIG_VERSION); @@ -218,8 +218,8 @@ TEST_F(ConfigUpgradeTests, InitialUpgrade) { VersionType versionOld; VersionType version; string errMsg; - bool result = - checkAndUpgradeConfigVersion(grid.catalogManager(), false, &versionOld, &version, &errMsg); + bool result = checkAndUpgradeConfigVersion( + grid.catalogManager(&_txn), false, &versionOld, &version, &errMsg); ASSERT(result); ASSERT_EQUALS(versionOld.getCurrentVersion(), 0); @@ -241,8 +241,8 @@ TEST_F(ConfigUpgradeTests, BadVersionUpgrade) { VersionType versionOld; VersionType version; string errMsg; - bool result = - checkAndUpgradeConfigVersion(grid.catalogManager(), false, &versionOld, &version, &errMsg); + bool result = checkAndUpgradeConfigVersion( + grid.catalogManager(&_txn), false, &versionOld, &version, &errMsg); ASSERT(!result); } @@ -257,11 +257,11 @@ TEST_F(ConfigUpgradeTests, CheckMongoVersion) { storeShardsAndPings(5, 10); // 5 shards, 10 pings // Our version is >= 2.2, so this works - Status status = checkClusterMongoVersions(grid.catalogManager(), "2.2"); + Status status = checkClusterMongoVersions(grid.catalogManager(&_txn), "2.2"); ASSERT(status.isOK()); // Our version is < 9.9, so this doesn't work (until we hit v99.99) - status = checkClusterMongoVersions(grid.catalogManager(), "99.99"); + status = checkClusterMongoVersions(grid.catalogManager(&_txn), "99.99"); ASSERT(status.code() == ErrorCodes::RemoteValidationError); } diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp index 6fd5d7dc923..9f181f49ac9 100644 --- a/src/mongo/dbtests/framework.cpp +++ b/src/mongo/dbtests/framework.cpp @@ -70,12 +70,16 @@ int runDbTests(int argc, char** argv) { ShardingState::get(getGlobalServiceContext())->initialize("$dummy:10000"); // Note: ShardingState::initialize also initializes the distLockMgr. - auto distLockMgr = - dynamic_cast<LegacyDistLockManager*>(grid.catalogManager()->getDistLockManager()); - if (distLockMgr) { - distLockMgr->enablePinger(false); + { + auto txn = cc().makeOperationContext(); + auto distLockMgr = dynamic_cast<LegacyDistLockManager*>( + grid.catalogManager(txn.get())->getDistLockManager()); + if (distLockMgr) { + distLockMgr->enablePinger(false); + } } + int ret = unittest::Suite::run(frameworkGlobalParams.suites, frameworkGlobalParams.filter, frameworkGlobalParams.runsPerTest); diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 4e5f49f169c..676f1fe21ad 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -155,6 +155,7 @@ env.CppUnitTest( 'cluster_ops', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/range_arithmetic', + '$BUILD_DIR/mongo/db/service_context', ] ) diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index ac476a8c67e..fdd575c196c 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -80,7 +80,8 @@ Balancer::Balancer() : _balancedLastTime(0), _policy(new BalancerPolicy()) {} Balancer::~Balancer() = default; -int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks, +int Balancer::_moveChunks(OperationContext* txn, + const vector<shared_ptr<MigrateInfo>>& candidateChunks, const WriteConcernOptions* writeConcern, bool waitForDelete) { int movedCount = 0; @@ -89,7 +90,7 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks // If the balancer was disabled since we started this round, don't start new chunks // moves. const auto balSettingsResult = - grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey); + grid.catalogManager(txn)->getGlobalSettings(SettingsType::BalancerDocKey); const bool isBalSettingsAbsent = balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; @@ -121,25 +122,25 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks const NamespaceString nss(migrateInfo->ns); try { - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); fassert(28628, status.getStatus()); shared_ptr<DBConfig> cfg = status.getValue(); // NOTE: We purposely do not reload metadata here, since _doBalanceRound already // tried to do so once. - shared_ptr<ChunkManager> cm = cfg->getChunkManager(migrateInfo->ns); + shared_ptr<ChunkManager> cm = cfg->getChunkManager(txn, migrateInfo->ns); invariant(cm); - ChunkPtr c = cm->findIntersectingChunk(migrateInfo->chunk.min); + ChunkPtr c = cm->findIntersectingChunk(txn, migrateInfo->chunk.min); if (c->getMin().woCompare(migrateInfo->chunk.min) || c->getMax().woCompare(migrateInfo->chunk.max)) { // Likely a split happened somewhere, so force reload the chunk manager - cm = cfg->getChunkManager(migrateInfo->ns, true); + cm = cfg->getChunkManager(txn, migrateInfo->ns, true); invariant(cm); - c = cm->findIntersectingChunk(migrateInfo->chunk.min); + c = cm->findIntersectingChunk(txn, migrateInfo->chunk.min); if (c->getMin().woCompare(migrateInfo->chunk.min) || c->getMax().woCompare(migrateInfo->chunk.max)) { @@ -151,7 +152,8 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks } BSONObj res; - if (c->moveAndCommit(migrateInfo->to, + if (c->moveAndCommit(txn, + migrateInfo->to, Chunk::MaxChunkSize, writeConcern, waitForDelete, @@ -167,20 +169,20 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks if (res["chunkTooBig"].trueValue()) { // Reload just to be safe - cm = cfg->getChunkManager(migrateInfo->ns); + cm = cfg->getChunkManager(txn, migrateInfo->ns); invariant(cm); - c = cm->findIntersectingChunk(migrateInfo->chunk.min); + c = cm->findIntersectingChunk(txn, migrateInfo->chunk.min); log() << "performing a split because migrate failed for size reasons"; - Status status = c->split(Chunk::normal, NULL, NULL); + Status status = c->split(txn, Chunk::normal, NULL, NULL); log() << "split results: " << status; if (!status.isOK()) { log() << "marking chunk as jumbo: " << c->toString(); - c->markAsJumbo(); + c->markAsJumbo(txn); // We increment moveCount so we do another round right away movedCount++; @@ -195,7 +197,7 @@ int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks return movedCount; } -void Balancer::_ping(bool waiting) { +void Balancer::_ping(OperationContext* txn, bool waiting) { MongosType mType; mType.setName(_myid); mType.setPing(jsTime()); @@ -203,12 +205,12 @@ void Balancer::_ping(bool waiting) { mType.setWaiting(waiting); mType.setMongoVersion(versionString); - grid.catalogManager()->update(MongosType::ConfigNS, - BSON(MongosType::name(_myid)), - BSON("$set" << mType.toBSON()), - true, - false, - NULL); + grid.catalogManager(txn)->update(MongosType::ConfigNS, + BSON(MongosType::name(_myid)), + BSON("$set" << mType.toBSON()), + true, + false, + NULL); } bool Balancer::_checkOIDs() { @@ -282,11 +284,12 @@ void warnOnMultiVersion(const ShardInfoMap& shardInfo) { } } -void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) { +void Balancer::_doBalanceRound(OperationContext* txn, + vector<shared_ptr<MigrateInfo>>* candidateChunks) { invariant(candidateChunks); vector<CollectionType> collections; - Status collsStatus = grid.catalogManager()->getCollections(nullptr, &collections); + Status collsStatus = grid.catalogManager(txn)->getCollections(nullptr, &collections); if (!collsStatus.isOK()) { warning() << "Failed to retrieve the set of collections during balancing round " << collsStatus; @@ -304,7 +307,7 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) // // TODO: skip unresponsive shards and mark information as stale. ShardInfoMap shardInfo; - Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo); + Status loadStatus = DistributionStatus::populateShardInfoMap(txn, &shardInfo); if (!loadStatus.isOK()) { warning() << "failed to load shard metadata" << causedBy(loadStatus); return; @@ -328,10 +331,10 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) } std::vector<ChunkType> allNsChunks; - grid.catalogManager()->getChunks(BSON(ChunkType::ns(nss.ns())), - BSON(ChunkType::min() << 1), - boost::none, // all chunks - &allNsChunks); + grid.catalogManager(txn)->getChunks(BSON(ChunkType::ns(nss.ns())), + BSON(ChunkType::min() << 1), + boost::none, // all chunks + &allNsChunks); set<BSONObj> allChunkMinimums; map<string, vector<ChunkType>> shardToChunksMap; @@ -362,7 +365,8 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) { vector<TagsType> collectionTags; - uassertStatusOK(grid.catalogManager()->getTagsForCollection(nss.ns(), &collectionTags)); + uassertStatusOK( + grid.catalogManager(txn)->getTagsForCollection(nss.ns(), &collectionTags)); for (const auto& tt : collectionTags) { ranges.push_back( TagRange(tt.getMinKey().getOwned(), tt.getMaxKey().getOwned(), tt.getTag())); @@ -372,7 +376,7 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) } } - auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString()); + auto statusGetDb = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!statusGetDb.isOK()) { warning() << "could not load db config to balance collection [" << nss.ns() << "]: " << statusGetDb.getStatus(); @@ -383,7 +387,7 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) // This line reloads the chunk manager once if this process doesn't know the collection // is sharded yet. - shared_ptr<ChunkManager> cm = cfg->getChunkManagerIfExists(nss.ns(), true); + shared_ptr<ChunkManager> cm = cfg->getChunkManagerIfExists(txn, nss.ns(), true); if (!cm) { warning() << "could not load chunks to balance " << nss.ns() << " collection"; continue; @@ -405,12 +409,12 @@ void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) log() << "nss: " << nss.ns() << " need to split on " << min << " because there is a range there"; - ChunkPtr c = cm->findIntersectingChunk(min); + ChunkPtr c = cm->findIntersectingChunk(txn, min); vector<BSONObj> splitPoints; splitPoints.push_back(min); - Status status = c->multiSplit(splitPoints, NULL); + Status status = c->multiSplit(txn, splitPoints, NULL); if (!status.isOK()) { error() << "split failed: " << status; } else { @@ -479,6 +483,8 @@ void Balancer::run() { const int sleepTime = 10; while (!inShutdown()) { + auto txn = cc().makeOperationContext(); + Timer balanceRoundTimer; ActionLogType actionLog; @@ -487,7 +493,7 @@ void Balancer::run() { try { // ping has to be first so we keep things in the config server in sync - _ping(); + _ping(txn.get()); BSONObj balancerResult; @@ -495,10 +501,10 @@ void Balancer::run() { Shard::reloadShardInfo(); // refresh chunk size (even though another balancer might be active) - Chunk::refreshChunkSize(); + Chunk::refreshChunkSize(txn.get()); auto balSettingsResult = - grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey); + grid.catalogManager(txn.get())->getGlobalSettings(SettingsType::BalancerDocKey); const bool isBalSettingsAbsent = balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { @@ -514,7 +520,7 @@ void Balancer::run() { LOG(1) << "skipping balancing round because balancing is disabled"; // Ping again so scripts can determine if we're active without waiting - _ping(true); + _ping(txn.get(), true); sleepsecs(sleepTime); continue; @@ -523,14 +529,14 @@ void Balancer::run() { uassert(13258, "oids broken after resetting!", _checkOIDs()); { - auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock( + auto scopedDistLock = grid.catalogManager(txn.get())->getDistLockManager()->lock( "balancer", "doing balance round"); if (!scopedDistLock.isOK()) { LOG(1) << "skipping balancing round" << causedBy(scopedDistLock.getStatus()); // Ping again so scripts can determine if we're active without waiting - _ping(true); + _ping(txn.get(), true); sleepsecs(sleepTime); // no need to wake up soon continue; @@ -550,14 +556,14 @@ void Balancer::run() { << (writeConcern.get() ? writeConcern->toBSON().toString() : "default"); vector<shared_ptr<MigrateInfo>> candidateChunks; - _doBalanceRound(&candidateChunks); + _doBalanceRound(txn.get(), &candidateChunks); if (candidateChunks.size() == 0) { LOG(1) << "no need to move any chunk"; _balancedLastTime = 0; } else { _balancedLastTime = - _moveChunks(candidateChunks, writeConcern.get(), waitForDelete); + _moveChunks(txn.get(), candidateChunks, writeConcern.get(), waitForDelete); } actionLog.setDetails(boost::none, @@ -566,13 +572,13 @@ void Balancer::run() { _balancedLastTime); actionLog.setTime(jsTime()); - grid.catalogManager()->logAction(actionLog); + grid.catalogManager(txn.get())->logAction(actionLog); LOG(1) << "*** end of balancing round"; } // Ping again so scripts can determine if we're active without waiting - _ping(true); + _ping(txn.get(), true); sleepsecs(_balancedLastTime ? sleepTime / 10 : sleepTime); } catch (std::exception& e) { @@ -585,7 +591,7 @@ void Balancer::run() { actionLog.setDetails(string(e.what()), balanceRoundTimer.millis(), 0, 0); actionLog.setTime(jsTime()); - grid.catalogManager()->logAction(actionLog); + grid.catalogManager(txn.get())->logAction(actionLog); // Sleep a fair amount before retrying because of the error sleepsecs(sleepTime); diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index d3dc9b39045..27b9d217640 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -37,6 +37,7 @@ namespace mongo { class BalancerPolicy; struct MigrateInfo; +class OperationContext; struct WriteConcernOptions; /** @@ -92,7 +93,8 @@ private: * @param candidateChunks (IN/OUT) filled with candidate chunks, one per collection, that could * possibly be moved */ - void _doBalanceRound(std::vector<std::shared_ptr<MigrateInfo>>* candidateChunks); + void _doBalanceRound(OperationContext* txn, + std::vector<std::shared_ptr<MigrateInfo>>* candidateChunks); /** * Issues chunk migration request, one at a time. @@ -102,14 +104,15 @@ private: * @param waitForDelete wait for deletes to complete after each chunk move * @return number of chunks effectively moved */ - int _moveChunks(const std::vector<std::shared_ptr<MigrateInfo>>& candidateChunks, + int _moveChunks(OperationContext* txn, + const std::vector<std::shared_ptr<MigrateInfo>>& candidateChunks, const WriteConcernOptions* writeConcern, bool waitForDelete); /** * Marks this balancer as being live on the config server(s). */ - void _ping(bool waiting = false); + void _ping(OperationContext* txn, bool waiting = false); /** * @return true if all the servers listed in configdb as being shards are reachable and are diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp index 18228e092e6..55aa205dfd7 100644 --- a/src/mongo/s/balancer_policy.cpp +++ b/src/mongo/s/balancer_policy.cpp @@ -273,10 +273,10 @@ void DistributionStatus::dump() const { } } -Status DistributionStatus::populateShardInfoMap(ShardInfoMap* shardInfo) { +Status DistributionStatus::populateShardInfoMap(OperationContext* txn, ShardInfoMap* shardInfo) { try { vector<ShardType> shards; - Status status = grid.catalogManager()->getAllShards(&shards); + Status status = grid.catalogManager(txn)->getAllShards(&shards); if (!status.isOK()) { return status; } diff --git a/src/mongo/s/balancer_policy.h b/src/mongo/s/balancer_policy.h index 16614f424f1..e9d5c8b1f1d 100644 --- a/src/mongo/s/balancer_policy.h +++ b/src/mongo/s/balancer_policy.h @@ -38,6 +38,7 @@ namespace mongo { class ChunkManager; +class OperationContext; struct ChunkInfo { const BSONObj min; @@ -199,7 +200,7 @@ public: * Retrieves shard metadata information from the config server as well as some stats * from the shards. */ - static Status populateShardInfoMap(ShardInfoMap* shardInfo); + static Status populateShardInfoMap(OperationContext* txn, ShardInfoMap* shardInfo); /** * Note: jumbo and versions are not set. diff --git a/src/mongo/s/catalog/catalog_cache.cpp b/src/mongo/s/catalog/catalog_cache.cpp index 416a0a7368a..2ec63337bc9 100644 --- a/src/mongo/s/catalog/catalog_cache.cpp +++ b/src/mongo/s/catalog/catalog_cache.cpp @@ -46,7 +46,8 @@ CatalogCache::CatalogCache(CatalogManager* catalogManager) : _catalogManager(cat invariant(_catalogManager); } -StatusWith<shared_ptr<DBConfig>> CatalogCache::getDatabase(const string& dbName) { +StatusWith<shared_ptr<DBConfig>> CatalogCache::getDatabase(OperationContext* txn, + const string& dbName) { stdx::lock_guard<stdx::mutex> guard(_mutex); ShardedDatabasesMap::iterator it = _databases.find(dbName); @@ -61,7 +62,7 @@ StatusWith<shared_ptr<DBConfig>> CatalogCache::getDatabase(const string& dbName) } shared_ptr<DBConfig> db = std::make_shared<DBConfig>(dbName, status.getValue()); - db->load(); + db->load(txn); invariant(_databases.insert(std::make_pair(dbName, db)).second); diff --git a/src/mongo/s/catalog/catalog_cache.h b/src/mongo/s/catalog/catalog_cache.h index 2547da29960..85f5a87a494 100644 --- a/src/mongo/s/catalog/catalog_cache.h +++ b/src/mongo/s/catalog/catalog_cache.h @@ -39,6 +39,7 @@ namespace mongo { class CatalogManager; class DBConfig; +class OperationContext; template <typename T> class StatusWith; @@ -63,7 +64,8 @@ public: * @param dbname The name of the database (must not contain dots, etc). * @return The database if it exists, NULL otherwise. */ - StatusWith<std::shared_ptr<DBConfig>> getDatabase(const std::string& dbName); + StatusWith<std::shared_ptr<DBConfig>> getDatabase(OperationContext* txn, + const std::string& dbName); /** * Removes the database information for the specified name from the cache, so that the diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 4c9bac8c522..98a02836ed7 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -332,13 +332,13 @@ Status CatalogManagerLegacy::shardCollection(OperationContext* txn, txn->getClient()->clientAddress(true), "shardCollection.start", ns, collectionDetail.obj()); shared_ptr<ChunkManager> manager(new ChunkManager(ns, fieldsAndOrder, unique)); - manager->createFirstChunks(dbPrimaryShardId, &initPoints, &initShardIds); - manager->loadExistingRanges(nullptr); + manager->createFirstChunks(txn, dbPrimaryShardId, &initPoints, &initShardIds); + manager->loadExistingRanges(txn, nullptr); CollectionInfo collInfo; collInfo.useChunkManager(manager); - collInfo.save(ns); - manager->reload(true); + collInfo.save(txn, ns); + manager->reload(txn, true); // Tell the primary mongod to refresh its data // TODO: Think the real fix here is for mongos to just diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 6b14ca34876..1d45a4988cb 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -209,13 +209,13 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, } shared_ptr<ChunkManager> manager(new ChunkManager(ns, fieldsAndOrder, unique)); - manager->createFirstChunks(dbPrimaryShardId, &initPoints, &initShardIds); - manager->loadExistingRanges(nullptr); + manager->createFirstChunks(txn, dbPrimaryShardId, &initPoints, &initShardIds); + manager->loadExistingRanges(txn, nullptr); CollectionInfo collInfo; collInfo.useChunkManager(manager); - collInfo.save(ns); - manager->reload(true); + collInfo.save(txn, ns); + manager->reload(txn, true); // Tell the primary mongod to refresh its data // TODO: Think the real fix here is for mongos to just diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp index 477006aaa2a..6f0a0a6e818 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp @@ -130,7 +130,7 @@ void CatalogManagerReplSetTestFixture::shutdownExecutor() { } CatalogManagerReplicaSet* CatalogManagerReplSetTestFixture::catalogManager() const { - auto cm = dynamic_cast<CatalogManagerReplicaSet*>(grid.catalogManager()); + auto cm = dynamic_cast<CatalogManagerReplicaSet*>(grid.catalogManager(_opCtx.get())); invariant(cm); return cm; diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 5263e2cbfb7..41923e3a994 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -72,12 +72,14 @@ const int kTooManySplitPoints = 4; * * Returns true if the chunk was actually moved. */ -bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) { +bool tryMoveToOtherShard(OperationContext* txn, + const ChunkManager& manager, + const ChunkType& chunk) { // reload sharding metadata before starting migration - ChunkManagerPtr chunkMgr = manager.reload(false /* just reloaded in mulitsplit */); + ChunkManagerPtr chunkMgr = manager.reload(txn, false /* just reloaded in mulitsplit */); ShardInfoMap shardInfo; - Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo); + Status loadStatus = DistributionStatus::populateShardInfoMap(txn, &shardInfo); if (!loadStatus.isOK()) { warning() << "failed to load shard metadata while trying to moveChunk after " @@ -93,7 +95,7 @@ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) { map<string, vector<ChunkType>> shardToChunkMap; DistributionStatus::populateShardToChunksMap(shardInfo, *chunkMgr, &shardToChunkMap); - StatusWith<string> tagStatus = grid.catalogManager()->getTagForChunk(manager.getns(), chunk); + StatusWith<string> tagStatus = grid.catalogManager(txn)->getTagForChunk(manager.getns(), chunk); if (!tagStatus.isOK()) { warning() << "Not auto-moving chunk because of an error encountered while " << "checking tag for chunk: " << tagStatus.getStatus(); @@ -114,7 +116,7 @@ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) { return false; } - ChunkPtr toMove = chunkMgr->findIntersectingChunk(chunk.getMin()); + ChunkPtr toMove = chunkMgr->findIntersectingChunk(txn, chunk.getMin()); if (!(toMove->getMin() == chunk.getMin() && toMove->getMax() == chunk.getMax())) { LOG(1) << "recently split chunk: " << chunk << " modified before we could migrate " @@ -132,7 +134,8 @@ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) { BSONObj res; WriteConcernOptions noThrottle; - if (!toMove->moveAndCommit(newShard->getId(), + if (!toMove->moveAndCommit(txn, + newShard->getId(), Chunk::MaxChunkSize, &noThrottle, /* secondaryThrottle */ false, /* waitForDelete - small chunk, no need */ @@ -142,7 +145,7 @@ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) { } // update our config - manager.reload(); + manager.reload(txn); return true; } @@ -357,7 +360,10 @@ void Chunk::determineSplitPoints(bool atMedian, vector<BSONObj>* splitPoints) co } } -Status Chunk::split(SplitPointMode mode, size_t* resultingSplits, BSONObj* res) const { +Status Chunk::split(OperationContext* txn, + SplitPointMode mode, + size_t* resultingSplits, + BSONObj* res) const { size_t dummy; if (resultingSplits == NULL) { resultingSplits = &dummy; @@ -416,12 +422,12 @@ Status Chunk::split(SplitPointMode mode, size_t* resultingSplits, BSONObj* res) return Status(ErrorCodes::CannotSplit, msg); } - Status status = multiSplit(splitPoints, res); + Status status = multiSplit(txn, splitPoints, res); *resultingSplits = splitPoints.size(); return status; } -Status Chunk::multiSplit(const vector<BSONObj>& m, BSONObj* res) const { +Status Chunk::multiSplit(OperationContext* txn, const vector<BSONObj>& m, BSONObj* res) const { const size_t maxSplitPoints = 8192; uassert(10165, "can't split as shard doesn't have a manager", _manager); @@ -438,7 +444,7 @@ Status Chunk::multiSplit(const vector<BSONObj>& m, BSONObj* res) const { cmd.append("max", getMax()); cmd.append("from", getShardId()); cmd.append("splitKeys", m); - cmd.append("configdb", grid.catalogManager()->connectionString().toString()); + cmd.append("configdb", grid.catalogManager(txn)->connectionString().toString()); cmd.append("epoch", _manager->getVersion().epoch()); BSONObj cmdObj = cmd.obj(); @@ -458,12 +464,13 @@ Status Chunk::multiSplit(const vector<BSONObj>& m, BSONObj* res) const { conn.done(); // force reload of config - _manager->reload(); + _manager->reload(txn); return Status::OK(); } -bool Chunk::moveAndCommit(const ShardId& toShardId, +bool Chunk::moveAndCommit(OperationContext* txn, + const ShardId& toShardId, long long chunkSize /* bytes */, const WriteConcernOptions* writeConcern, bool waitForDelete, @@ -490,7 +497,7 @@ bool Chunk::moveAndCommit(const ShardId& toShardId, builder.append("min", _min); builder.append("max", _max); builder.append("maxChunkSizeBytes", chunkSize); - builder.append("configdb", grid.catalogManager()->connectionString().toString()); + builder.append("configdb", grid.catalogManager(txn)->connectionString().toString()); // For legacy secondary throttle setting. bool secondaryThrottle = true; @@ -517,12 +524,12 @@ bool Chunk::moveAndCommit(const ShardId& toShardId, // if succeeded, needs to reload to pick up the new location // if failed, mongos may be stale // reload is excessive here as the failure could be simply because collection metadata is taken - _manager->reload(); + _manager->reload(txn); return worked; } -bool Chunk::splitIfShould(long dataWritten) const { +bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) const { dassert(ShouldAutoSplit); LastError::Disabled d(&LastError::get(cc())); @@ -555,7 +562,7 @@ bool Chunk::splitIfShould(long dataWritten) const { BSONObj res; size_t splitCount = 0; - Status status = split(Chunk::autoSplitInternal, &splitCount, &res); + Status status = split(txn, Chunk::autoSplitInternal, &splitCount, &res); if (!status.isOK()) { // Split would have issued a message if we got here. This means there wasn't enough // data to split, so don't want to try again until considerable more data @@ -571,9 +578,9 @@ bool Chunk::splitIfShould(long dataWritten) const { _dataWritten = 0; } - bool shouldBalance = grid.getConfigShouldBalance(); + bool shouldBalance = grid.getConfigShouldBalance(txn); if (shouldBalance) { - auto status = grid.catalogManager()->getCollection(_manager->getns()); + auto status = grid.catalogManager(txn)->getCollection(_manager->getns()); if (!status.isOK()) { log() << "Auto-split for " << _manager->getns() << " failed to load collection metadata due to " << status.getStatus(); @@ -603,7 +610,7 @@ bool Chunk::splitIfShould(long dataWritten) const { chunkToMove.setMin(range["min"].embeddedObject()); chunkToMove.setMax(range["max"].embeddedObject()); - tryMoveToOtherShard(*_manager, chunkToMove); + tryMoveToOtherShard(txn, *_manager, chunkToMove); } return true; @@ -694,26 +701,26 @@ string Chunk::toString() const { return ss.str(); } -void Chunk::markAsJumbo() const { +void Chunk::markAsJumbo(OperationContext* txn) const { // set this first // even if we can't set it in the db // at least this mongos won't try and keep moving _jumbo = true; - Status result = grid.catalogManager()->update(ChunkType::ConfigNS, - BSON(ChunkType::name(genID())), - BSON("$set" << BSON(ChunkType::jumbo(true))), - false, // upsert - false, // multi - NULL); + Status result = grid.catalogManager(txn)->update(ChunkType::ConfigNS, + BSON(ChunkType::name(genID())), + BSON("$set" << BSON(ChunkType::jumbo(true))), + false, // upsert + false, // multi + NULL); if (!result.isOK()) { warning() << "couldn't set jumbo for chunk: " << genID() << result.reason(); } } -void Chunk::refreshChunkSize() { +void Chunk::refreshChunkSize(OperationContext* txn) { auto chunkSizeSettingsResult = - grid.catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey); + grid.catalogManager(txn)->getGlobalSettings(SettingsType::ChunkSizeDocKey); if (!chunkSizeSettingsResult.isOK()) { log() << chunkSizeSettingsResult.getStatus(); return; diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index d9872e77d47..1ec9a9447cb 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -34,6 +34,7 @@ namespace mongo { class ChunkManager; +class OperationContext; struct WriteConcernOptions; /** @@ -126,7 +127,7 @@ public: * then we check the real size, and if its too big, we split * @return if something was split */ - bool splitIfShould(long dataWritten) const; + bool splitIfShould(OperationContext* txn, long dataWritten) const; /** * Splits this chunk at a non-specificed split key to be chosen by the @@ -138,7 +139,10 @@ public: * * @throws UserException */ - Status split(SplitPointMode mode, size_t* resultingSplits, BSONObj* res) const; + Status split(OperationContext* txn, + SplitPointMode mode, + size_t* resultingSplits, + BSONObj* res) const; /** * Splits this chunk at the given key (or keys) @@ -148,7 +152,9 @@ public: * * @throws UserException */ - Status multiSplit(const std::vector<BSONObj>& splitPoints, BSONObj* res) const; + Status multiSplit(OperationContext* txn, + const std::vector<BSONObj>& splitPoints, + BSONObj* res) const; /** * Asks the mongod holding this chunk to find a key that approximately divides this chunk in two @@ -184,7 +190,8 @@ public: * @param res the object containing details about the migrate execution * @return true if move was successful */ - bool moveAndCommit(const ShardId& to, + bool moveAndCommit(OperationContext* txn, + const ShardId& to, long long chunkSize, const WriteConcernOptions* writeConcern, bool waitForDelete, @@ -201,7 +208,7 @@ public: * marks this chunk as a jumbo chunk * that means the chunk will be inelligble for migrates */ - void markAsJumbo() const; + void markAsJumbo(OperationContext* txn) const; bool isJumbo() const { return _jumbo; @@ -210,7 +217,7 @@ public: /** * Attempt to refresh maximum chunk size from config. */ - static void refreshChunkSize(); + static void refreshChunkSize(OperationContext* txn); /** * sets MaxChunkSize diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index b200ce4e482..638935ddbf5 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -172,7 +172,7 @@ ChunkManager::ChunkManager(const CollectionType& coll) _version = ChunkVersion::fromBSON(coll.toBSON()); } -void ChunkManager::loadExistingRanges(const ChunkManager* oldManager) { +void ChunkManager::loadExistingRanges(OperationContext* txn, const ChunkManager* oldManager) { int tries = 3; while (tries--) { @@ -182,7 +182,7 @@ void ChunkManager::loadExistingRanges(const ChunkManager* oldManager) { Timer t; - bool success = _load(chunkMap, shardIds, &shardVersions, oldManager); + bool success = _load(txn, chunkMap, shardIds, &shardVersions, oldManager); if (success) { log() << "ChunkManager: time to load chunks for " << _ns << ": " << t.millis() << "ms" << " sequenceNumber: " << _sequenceNumber << " version: " << _version.toString() @@ -215,7 +215,8 @@ void ChunkManager::loadExistingRanges(const ChunkManager* oldManager) { << " after 3 attempts. Please try again."); } -bool ChunkManager::_load(ChunkMap& chunkMap, +bool ChunkManager::_load(OperationContext* txn, + ChunkMap& chunkMap, set<ShardId>& shardIds, ShardVersionMap* shardVersions, const ChunkManager* oldManager) { @@ -262,7 +263,7 @@ bool ChunkManager::_load(ChunkMap& chunkMap, std::vector<ChunkType> chunks; uassertStatusOK( - grid.catalogManager()->getChunks(diffQuery.query, diffQuery.sort, boost::none, &chunks)); + grid.catalogManager(txn)->getChunks(diffQuery.query, diffQuery.sort, boost::none, &chunks)); int diffsApplied = differ.calculateConfigDiff(chunks); if (diffsApplied > 0) { @@ -317,12 +318,12 @@ bool ChunkManager::_load(ChunkMap& chunkMap, } } -shared_ptr<ChunkManager> ChunkManager::reload(bool force) const { +shared_ptr<ChunkManager> ChunkManager::reload(OperationContext* txn, bool force) const { const NamespaceString nss(_ns); - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); shared_ptr<DBConfig> config = uassertStatusOK(status); - return config->getChunkManager(getns(), force); + return config->getChunkManager(txn, getns(), force); } void ChunkManager::_printChunks() const { @@ -385,7 +386,8 @@ void ChunkManager::calcInitSplitsAndShards(const ShardId& primaryShardId, } } -void ChunkManager::createFirstChunks(const ShardId& primaryShardId, +void ChunkManager::createFirstChunks(OperationContext* txn, + const ShardId& primaryShardId, const vector<BSONObj>* initPoints, const set<ShardId>* initShardIds) { // TODO distlock? @@ -417,7 +419,7 @@ void ChunkManager::createFirstChunks(const ShardId& primaryShardId, BSONObj chunkObj = chunkBuilder.obj(); - Status result = grid.catalogManager()->update( + Status result = grid.catalogManager(txn)->update( ChunkType::ConfigNS, BSON(ChunkType::name(temp.genID())), chunkObj, true, false, NULL); version.incMinor(); @@ -433,7 +435,7 @@ void ChunkManager::createFirstChunks(const ShardId& primaryShardId, _version = ChunkVersion(0, 0, version.epoch()); } -ChunkPtr ChunkManager::findIntersectingChunk(const BSONObj& shardKey) const { +ChunkPtr ChunkManager::findIntersectingChunk(OperationContext* txn, const BSONObj& shardKey) const { { BSONObj chunkMin; ChunkPtr chunk; @@ -454,7 +456,7 @@ ChunkPtr ChunkManager::findIntersectingChunk(const BSONObj& shardKey) const { log() << *chunk; log() << shardKey; - reload(); + reload(txn); msgasserted(13141, "Chunk map pointed to incorrect chunk"); } } diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index cb68b42b858..0ca94237920 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -42,6 +42,7 @@ class CanonicalQuery; class ChunkManager; class CollectionType; struct QuerySolutionNode; +class OperationContext; typedef std::shared_ptr<ChunkManager> ChunkManagerPtr; @@ -159,12 +160,13 @@ public: // // Creates new chunks based on info in chunk manager - void createFirstChunks(const ShardId& primaryShardId, + void createFirstChunks(OperationContext* txn, + const ShardId& primaryShardId, const std::vector<BSONObj>* initPoints, const std::set<ShardId>* initShardIds); // Loads existing ranges based on info in chunk manager - void loadExistingRanges(const ChunkManager* oldManager); + void loadExistingRanges(OperationContext* txn, const ChunkManager* oldManager); // Helpers for load @@ -190,7 +192,7 @@ public: * when the shard key is {a : "hashed"}, you can call * findIntersectingChunk() on {a : hash("foo") } */ - ChunkPtr findIntersectingChunk(const BSONObj& shardKey) const; + ChunkPtr findIntersectingChunk(OperationContext* txn, const BSONObj& shardKey) const; void getShardIdsForQuery(std::set<ShardId>& shardIds, const BSONObj& query) const; void getAllShardIds(std::set<ShardId>* all) const; @@ -238,11 +240,13 @@ public: int getCurrentDesiredChunkSize() const; - std::shared_ptr<ChunkManager> reload(bool force = true) const; // doesn't modify self! + std::shared_ptr<ChunkManager> reload(OperationContext* txn, + bool force = true) const; // doesn't modify self! private: // returns true if load was consistent - bool _load(ChunkMap& chunks, + bool _load(OperationContext* txn, + ChunkMap& chunks, std::set<ShardId>& shardIds, ShardVersionMap* shardVersions, const ChunkManager* oldManager); diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index 446d4a5fafd..ee98e9a8481 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -266,8 +266,8 @@ bool wasMetadataRefreshed(const ChunkManagerPtr& managerA, ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss) : _nss(nss), _needsTargetingRefresh(false) {} -Status ChunkManagerTargeter::init() { - auto status = grid.implicitCreateDb(_nss.db().toString()); +Status ChunkManagerTargeter::init(OperationContext* txn) { + auto status = grid.implicitCreateDb(txn, _nss.db().toString()); if (!status.isOK()) { return status.getStatus(); } @@ -282,7 +282,9 @@ const NamespaceString& ChunkManagerTargeter::getNS() const { return _nss; } -Status ChunkManagerTargeter::targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const { +Status ChunkManagerTargeter::targetInsert(OperationContext* txn, + const BSONObj& doc, + ShardEndpoint** endpoint) const { BSONObj shardKey; if (_manager) { @@ -310,7 +312,7 @@ Status ChunkManagerTargeter::targetInsert(const BSONObj& doc, ShardEndpoint** en // Target the shard key or database primary if (!shardKey.isEmpty()) { - return targetShardKey(shardKey, doc.objsize(), endpoint); + return targetShardKey(txn, shardKey, doc.objsize(), endpoint); } else { if (!_primary) { return Status(ErrorCodes::NamespaceNotFound, @@ -323,7 +325,8 @@ Status ChunkManagerTargeter::targetInsert(const BSONObj& doc, ShardEndpoint** en } } -Status ChunkManagerTargeter::targetUpdate(const BatchedUpdateDocument& updateDoc, +Status ChunkManagerTargeter::targetUpdate(OperationContext* txn, + const BatchedUpdateDocument& updateDoc, vector<ShardEndpoint*>* endpoints) const { // // Update targeting may use either the query or the update. This is to support save-style @@ -411,7 +414,7 @@ Status ChunkManagerTargeter::targetUpdate(const BatchedUpdateDocument& updateDoc // We can't rely on our query targeting to be exact ShardEndpoint* endpoint = NULL; Status result = - targetShardKey(shardKey, (query.objsize() + updateExpr.objsize()), &endpoint); + targetShardKey(txn, shardKey, (query.objsize() + updateExpr.objsize()), &endpoint); endpoints->push_back(endpoint); return result; } else if (updateType == UpdateType_OpStyle) { @@ -421,7 +424,8 @@ Status ChunkManagerTargeter::targetUpdate(const BatchedUpdateDocument& updateDoc } } -Status ChunkManagerTargeter::targetDelete(const BatchedDeleteDocument& deleteDoc, +Status ChunkManagerTargeter::targetDelete(OperationContext* txn, + const BatchedDeleteDocument& deleteDoc, vector<ShardEndpoint*>* endpoints) const { BSONObj shardKey; @@ -456,7 +460,7 @@ Status ChunkManagerTargeter::targetDelete(const BatchedDeleteDocument& deleteDoc if (!shardKey.isEmpty()) { // We can't rely on our query targeting to be exact ShardEndpoint* endpoint = NULL; - Status result = targetShardKey(shardKey, 0, &endpoint); + Status result = targetShardKey(txn, shardKey, 0, &endpoint); endpoints->push_back(endpoint); return result; } else { @@ -498,12 +502,13 @@ Status ChunkManagerTargeter::targetQuery(const BSONObj& query, return Status::OK(); } -Status ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey, +Status ChunkManagerTargeter::targetShardKey(OperationContext* txn, + const BSONObj& shardKey, long long estDataSize, ShardEndpoint** endpoint) const { invariant(NULL != _manager); - ChunkPtr chunk = _manager->findIntersectingChunk(shardKey); + ChunkPtr chunk = _manager->findIntersectingChunk(txn, shardKey); // Track autosplit stats for sharded collections // Note: this is only best effort accounting and is not accurate. @@ -597,7 +602,7 @@ const TargeterStats* ChunkManagerTargeter::getStats() const { return &_stats; } -Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) { +Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* txn, bool* wasChanged) { bool dummy; if (!wasChanged) { wasChanged = &dummy; @@ -620,7 +625,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) { ChunkManagerPtr lastManager = _manager; ShardPtr lastPrimary = _primary; - auto status = grid.implicitCreateDb(_nss.db().toString()); + auto status = grid.implicitCreateDb(txn, _nss.db().toString()); if (!status.isOK()) { return status.getStatus(); } @@ -649,7 +654,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) { // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { // To match previous behavior, we just need an incremental refresh here - return refreshNow(RefreshType_RefreshChunkManager); + return refreshNow(txn, RefreshType_RefreshChunkManager); } *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); @@ -665,10 +670,10 @@ Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) { if (result == CompareResult_Unknown) { // Our current shard versions aren't all comparable to the old versions, maybe drop - return refreshNow(RefreshType_ReloadDatabase); + return refreshNow(txn, RefreshType_ReloadDatabase); } else if (result == CompareResult_LT) { // Our current shard versions are less than the remote versions, but no drop - return refreshNow(RefreshType_RefreshChunkManager); + return refreshNow(txn, RefreshType_RefreshChunkManager); } *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); @@ -680,8 +685,8 @@ Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) { return Status::OK(); } -Status ChunkManagerTargeter::refreshNow(RefreshType refreshType) { - auto status = grid.implicitCreateDb(_nss.db().toString()); +Status ChunkManagerTargeter::refreshNow(OperationContext* txn, RefreshType refreshType) { + auto status = grid.implicitCreateDb(txn, _nss.db().toString()); if (!status.isOK()) { return status.getStatus(); } @@ -696,7 +701,7 @@ Status ChunkManagerTargeter::refreshNow(RefreshType refreshType) { try { // Forces a remote check of the collection info, synchronization between threads // happens internally. - config->getChunkManagerIfExists(_nss.ns(), true); + config->getChunkManagerIfExists(txn, _nss.ns(), true); } catch (const DBException& ex) { return Status(ErrorCodes::UnknownError, ex.toString()); } @@ -705,8 +710,8 @@ Status ChunkManagerTargeter::refreshNow(RefreshType refreshType) { try { // Dumps the db info, reloads it all, synchronization between threads happens // internally. - config->reload(); - config->getChunkManagerIfExists(_nss.ns(), true, true); + config->reload(txn); + config->getChunkManagerIfExists(txn, _nss.ns(), true, true); } catch (const DBException& ex) { return Status(ErrorCodes::UnknownError, ex.toString()); } diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h index e7c754448c8..e38b9b1742f 100644 --- a/src/mongo/s/chunk_manager_targeter.h +++ b/src/mongo/s/chunk_manager_targeter.h @@ -38,6 +38,7 @@ namespace mongo { class ChunkManager; struct ChunkVersion; +class OperationContext; class Shard; struct TargeterStats { @@ -62,19 +63,21 @@ public: * * Returns !OK if the information could not be initialized. */ - Status init(); + Status init(OperationContext* txn); const NamespaceString& getNS() const; // Returns ShardKeyNotFound if document does not have a full shard key. - Status targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const; + Status targetInsert(OperationContext* txn, const BSONObj& doc, ShardEndpoint** endpoint) const; // Returns ShardKeyNotFound if the update can't be targeted without a shard key. - Status targetUpdate(const BatchedUpdateDocument& updateDoc, + Status targetUpdate(OperationContext* txn, + const BatchedUpdateDocument& updateDoc, std::vector<ShardEndpoint*>* endpoints) const; // Returns ShardKeyNotFound if the delete can't be targeted without a shard key. - Status targetDelete(const BatchedDeleteDocument& deleteDoc, + Status targetDelete(OperationContext* txn, + const BatchedDeleteDocument& deleteDoc, std::vector<ShardEndpoint*>* endpoints) const; Status targetCollection(std::vector<ShardEndpoint*>* endpoints) const; @@ -94,7 +97,7 @@ public: * * Also see NSTargeter::refreshIfNeeded(). */ - Status refreshIfNeeded(bool* wasChanged); + Status refreshIfNeeded(OperationContext* txn, bool* wasChanged); /** * Returns the stats. Note that the returned stats object is still owned by this targeter. @@ -118,7 +121,7 @@ private: /** * Performs an actual refresh from the config server. */ - Status refreshNow(RefreshType refreshType); + Status refreshNow(OperationContext* txn, RefreshType refreshType); /** * Returns a vector of ShardEndpoints where a document might need to be placed. @@ -140,7 +143,8 @@ private: * Also has the side effect of updating the chunks stats with an estimate of the amount of * data targeted at this shard key. */ - Status targetShardKey(const BSONObj& doc, + Status targetShardKey(OperationContext* txn, + const BSONObj& doc, long long estDataSize, ShardEndpoint** endpoint) const; diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp index c6d765afb5a..3850e6d747f 100644 --- a/src/mongo/s/client/shard_connection.cpp +++ b/src/mongo/s/client/shard_connection.cpp @@ -278,7 +278,7 @@ public: } } - void checkVersions(const string& ns) { + void checkVersions(OperationContext* txn, const string& ns) { vector<ShardId> all; grid.shardRegistry()->getAllShardIds(&all); @@ -301,7 +301,7 @@ public: s->created++; // After, so failed creation doesn't get counted } - versionManager.checkShardVersionCB(s->avail, ns, false, 1); + versionManager.checkShardVersionCB(txn, s->avail, ns, false, 1); } catch (const DBException& ex) { warning() << "problem while initially checking shard versions on" << " " << shardId << causedBy(ex); @@ -450,7 +450,10 @@ void ShardConnection::_finishInit() { _finishedInit = true; if (versionManager.isVersionableCB(_conn)) { - _setVersion = versionManager.checkShardVersionCB(this, false, 1); + auto& client = cc(); + auto txn = client.getOperationContext(); + invariant(txn); + _setVersion = versionManager.checkShardVersionCB(txn, this, false, 1); } else { // Make sure we didn't specify a manager for a non-versionable connection (i.e. config) verify(!_manager); @@ -488,8 +491,8 @@ void ShardConnection::sync() { ClientConnections::threadInstance()->sync(); } -void ShardConnection::checkMyConnectionVersions(const string& ns) { - ClientConnections::threadInstance()->checkVersions(ns); +void ShardConnection::checkMyConnectionVersions(OperationContext* txn, const string& ns) { + ClientConnections::threadInstance()->checkVersions(txn, ns); } void ShardConnection::releaseMyConnections() { diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h index 8882961bb1c..4bac99332c0 100644 --- a/src/mongo/s/client/shard_connection.h +++ b/src/mongo/s/client/shard_connection.h @@ -103,7 +103,7 @@ public: } /** checks all of my thread local connections for the version of this ns */ - static void checkMyConnectionVersions(const std::string& ns); + static void checkMyConnectionVersions(OperationContext* txn, const std::string& ns); /** * Returns all the current sharded connections to the pool. diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp index 877fc896a69..459960b83cd 100644 --- a/src/mongo/s/cluster_write.cpp +++ b/src/mongo/s/cluster_write.cpp @@ -111,12 +111,12 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { /** * Splits the chunks touched based from the targeter stats if needed. */ -void splitIfNeeded(const NamespaceString& nss, const TargeterStats& stats) { +void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const TargeterStats& stats) { if (!Chunk::ShouldAutoSplit) { return; } - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { warning() << "failed to get database config for " << nss << " while checking for auto-split: " << status.getStatus(); @@ -138,19 +138,20 @@ void splitIfNeeded(const NamespaceString& nss, const TargeterStats& stats) { ++it) { ChunkPtr chunk; try { - chunk = chunkManager->findIntersectingChunk(it->first); + chunk = chunkManager->findIntersectingChunk(txn, it->first); } catch (const AssertionException& ex) { warning() << "could not find chunk while checking for auto-split: " << causedBy(ex); return; } - chunk->splitIfShould(it->second); + chunk->splitIfShould(txn, it->second); } } } // namespace -Status clusterCreateIndex(const string& ns, +Status clusterCreateIndex(OperationContext* txn, + const string& ns, BSONObj keys, bool unique, BatchedCommandResponse* response) { @@ -173,7 +174,7 @@ Status clusterCreateIndex(const string& ns, } ClusterWriter writer(false, 0); - writer.write(request, response); + writer.write(txn, request, response); if (response->getOk() != 1) { return Status(static_cast<ErrorCodes::Error>(response->getErrCode()), @@ -198,7 +199,8 @@ Status clusterCreateIndex(const string& ns, } -void ClusterWriter::write(const BatchedCommandRequest& origRequest, +void ClusterWriter::write(OperationContext* txn, + const BatchedCommandRequest& origRequest, BatchedCommandResponse* response) { // Add _ids to insert request if req'd unique_ptr<BatchedCommandRequest> idRequest(BatchedCommandRequest::cloneWithIds(origRequest)); @@ -242,10 +244,10 @@ void ClusterWriter::write(const BatchedCommandRequest& origRequest, const string dbName = nss.db().toString(); if (dbName == "config" || dbName == "admin") { - grid.catalogManager()->writeConfigServerDirect(request, response); + grid.catalogManager(txn)->writeConfigServerDirect(request, response); } else { ChunkManagerTargeter targeter(request.getTargetingNSS()); - Status targetInitStatus = targeter.init(); + Status targetInitStatus = targeter.init(txn); if (!targetInitStatus.isOK()) { // Errors will be reported in response if we are unable to target @@ -257,10 +259,10 @@ void ClusterWriter::write(const BatchedCommandRequest& origRequest, DBClientShardResolver resolver; DBClientMultiCommand dispatcher; BatchWriteExec exec(&targeter, &resolver, &dispatcher); - exec.executeBatch(request, response); + exec.executeBatch(txn, request, response); if (_autoSplit) { - splitIfNeeded(request.getNS(), *targeter.getStats()); + splitIfNeeded(txn, request.getNS(), *targeter.getStats()); } _stats->setShardStats(exec.releaseStats()); diff --git a/src/mongo/s/cluster_write.h b/src/mongo/s/cluster_write.h index 9fd177f756a..0ef15426da7 100644 --- a/src/mongo/s/cluster_write.h +++ b/src/mongo/s/cluster_write.h @@ -37,12 +37,15 @@ namespace mongo { class ClusterWriterStats; class BatchWriteExecStats; +class OperationContext; class ClusterWriter { public: ClusterWriter(bool autoSplit, int timeoutMillis); - void write(const BatchedCommandRequest& request, BatchedCommandResponse* response); + void write(OperationContext* txn, + const BatchedCommandRequest& request, + BatchedCommandResponse* response); const ClusterWriterStats& getStats(); @@ -73,7 +76,8 @@ private: * * Note: response can be NULL if you don't care about the write statistics. */ -Status clusterCreateIndex(const std::string& ns, +Status clusterCreateIndex(OperationContext* txn, + const std::string& ns, BSONObj keys, bool unique, BatchedCommandResponse* response); diff --git a/src/mongo/s/commands/cluster_add_shard_cmd.cpp b/src/mongo/s/commands/cluster_add_shard_cmd.cpp index c772863d8db..7f0a8e5dbd0 100644 --- a/src/mongo/s/commands/cluster_add_shard_cmd.cpp +++ b/src/mongo/s/commands/cluster_add_shard_cmd.cpp @@ -120,7 +120,7 @@ public: audit::logAddShard(ClientBasic::getCurrent(), name, servers.toString(), maxSize); - StatusWith<string> addShardResult = grid.catalogManager()->addShard( + StatusWith<string> addShardResult = grid.catalogManager(txn)->addShard( txn, (name.empty() ? nullptr : &name), servers, maxSize); if (!addShardResult.isOK()) { log() << "addShard request '" << cmdObj << "'" diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp index 93a87c9f497..89b623f7078 100644 --- a/src/mongo/s/commands/cluster_commands_common.cpp +++ b/src/mongo/s/commands/cluster_commands_common.cpp @@ -88,7 +88,7 @@ void Future::CommandResult::init() { } } -bool Future::CommandResult::join(int maxRetries) { +bool Future::CommandResult::join(OperationContext* txn, int maxRetries) { if (_done) { return _ok; } @@ -129,7 +129,7 @@ bool Future::CommandResult::join(int maxRetries) { } if (i >= maxRetries / 2) { - if (!versionManager.forceRemoteCheckShardVersionCB(staleNS)) { + if (!versionManager.forceRemoteCheckShardVersionCB(txn, staleNS)) { error() << "Future::spawnCommand (part 2) no config detected" << causedBy(e); throw e; } @@ -141,7 +141,7 @@ bool Future::CommandResult::join(int maxRetries) { warning() << "no collection namespace in stale config exception " << "for lazy command " << _cmd << ", could not refresh " << staleNS; } else { - versionManager.checkShardVersionCB(_conn, staleNS, false, 1); + versionManager.checkShardVersionCB(txn, _conn, staleNS, false, 1); } LOG(i > 1 ? 0 : 1) << "retrying lazy command" << causedBy(e); diff --git a/src/mongo/s/commands/cluster_commands_common.h b/src/mongo/s/commands/cluster_commands_common.h index b1b41459f01..39ef671c39c 100644 --- a/src/mongo/s/commands/cluster_commands_common.h +++ b/src/mongo/s/commands/cluster_commands_common.h @@ -74,7 +74,7 @@ public: blocks until command is done returns ok() */ - bool join(int maxRetries = 1); + bool join(OperationContext* txn, int maxRetries = 1); private: CommandResult(const std::string& server, diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index 3ceef4cc01a..3253afc6ba2 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -154,7 +154,8 @@ public: } vector<Strategy::CommandResult> countResult; - Strategy::commandOp(dbname, countCmdBuilder.done(), options, fullns, filter, &countResult); + Strategy::commandOp( + txn, dbname, countCmdBuilder.done(), options, fullns, filter, &countResult); long long total = 0; BSONObjBuilder shardSubTotal(result.subobjStart("shards")); @@ -212,7 +213,8 @@ public: Timer timer; vector<Strategy::CommandResult> shardResults; - Strategy::commandOp(dbname, explainCmdBob.obj(), 0, fullns, targetingQuery, &shardResults); + Strategy::commandOp( + txn, dbname, explainCmdBob.obj(), 0, fullns, targetingQuery, &shardResults); long long millisElapsed = timer.millis(); diff --git a/src/mongo/s/commands/cluster_drop_database_cmd.cpp b/src/mongo/s/commands/cluster_drop_database_cmd.cpp index 8f2781fb4dc..e913c5f8a7e 100644 --- a/src/mongo/s/commands/cluster_drop_database_cmd.cpp +++ b/src/mongo/s/commands/cluster_drop_database_cmd.cpp @@ -90,7 +90,7 @@ public: // Refresh the database metadata grid.catalogCache()->invalidate(dbname); - auto status = grid.catalogCache()->getDatabase(dbname); + auto status = grid.catalogCache()->getDatabase(txn, dbname); if (!status.isOK()) { if (status == ErrorCodes::DatabaseNotFound) { result.append("info", "database does not exist"); diff --git a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp index 62749bfbab2..37ef726dfb1 100644 --- a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp +++ b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp @@ -104,7 +104,7 @@ public: return false; } - Status status = grid.catalogManager()->enableSharding(dbname); + Status status = grid.catalogManager(txn)->enableSharding(dbname); if (status.isOK()) { audit::logEnableSharding(ClientBasic::getCurrent(), dbname); } diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 9bd33cb24f0..fe9ab7787d5 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -82,7 +82,7 @@ public: BSONObjBuilder* out) const { const string ns = parseNsCollectionRequired(dbName, cmdObj); - auto status = grid.catalogCache()->getDatabase(dbName); + auto status = grid.catalogCache()->getDatabase(txn, dbName); uassertStatusOK(status); shared_ptr<DBConfig> conf = status.getValue(); @@ -92,7 +92,7 @@ public: if (!conf->isShardingEnabled() || !conf->isSharded(ns)) { shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); } else { - shared_ptr<ChunkManager> chunkMgr = _getChunkManager(conf, ns); + shared_ptr<ChunkManager> chunkMgr = _getChunkManager(txn, conf, ns); const BSONObj query = cmdObj.getObjectField("query"); @@ -102,7 +102,7 @@ public: } BSONObj shardKey = status.getValue(); - ChunkPtr chunk = chunkMgr->findIntersectingChunk(shardKey); + ChunkPtr chunk = chunkMgr->findIntersectingChunk(txn, shardKey); shard = grid.shardRegistry()->getShard(chunk->getShardId()); } @@ -145,12 +145,12 @@ public: // findAndModify should only be creating database if upsert is true, but this would // require that the parsing be pulled into this function. - auto conf = uassertStatusOK(grid.implicitCreateDb(dbName)); + auto conf = uassertStatusOK(grid.implicitCreateDb(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(ns)) { return _runCommand(conf, conf->getPrimaryId(), ns, cmdObj, result); } - shared_ptr<ChunkManager> chunkMgr = _getChunkManager(conf, ns); + shared_ptr<ChunkManager> chunkMgr = _getChunkManager(txn, conf, ns); const BSONObj query = cmdObj.getObjectField("query"); @@ -161,13 +161,13 @@ public: } BSONObj shardKey = status.getValue(); - ChunkPtr chunk = chunkMgr->findIntersectingChunk(shardKey); + ChunkPtr chunk = chunkMgr->findIntersectingChunk(txn, shardKey); bool ok = _runCommand(conf, chunk->getShardId(), ns, cmdObj, result); if (ok) { // check whether split is necessary (using update object for size heuristic) if (Chunk::ShouldAutoSplit) { - chunk->splitIfShould(cmdObj.getObjectField("update").objsize()); + chunk->splitIfShould(txn, cmdObj.getObjectField("update").objsize()); } } @@ -175,8 +175,10 @@ public: } private: - shared_ptr<ChunkManager> _getChunkManager(shared_ptr<DBConfig> conf, const string& ns) const { - shared_ptr<ChunkManager> chunkMgr = conf->getChunkManager(ns); + shared_ptr<ChunkManager> _getChunkManager(OperationContext* txn, + shared_ptr<DBConfig> conf, + const string& ns) const { + shared_ptr<ChunkManager> chunkMgr = conf->getChunkManager(txn, ns); massert(13002, "shard internal error chunk manager should never be null", chunkMgr); return chunkMgr; diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index c00cc24368f..8fe8b2724a1 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -130,7 +130,8 @@ public: Timer timer; vector<Strategy::CommandResult> shardResults; - Strategy::commandOp(dbname, + Strategy::commandOp(txn, + dbname, explainCmdBob.obj(), lpq->getOptions(), fullns, diff --git a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp index 54d88f7d6bb..9ac3d9bdb61 100644 --- a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp @@ -94,7 +94,7 @@ public: result, Status(ErrorCodes::InvalidNamespace, "no namespace specified")); } - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } @@ -106,7 +106,7 @@ public: Status(ErrorCodes::NamespaceNotSharded, "ns [" + nss.ns() + " is not sharded.")); } - ChunkManagerPtr cm = config->getChunkManagerIfExists(nss.ns()); + ChunkManagerPtr cm = config->getChunkManagerIfExists(txn, nss.ns()); if (!cm) { errmsg = "no chunk manager?"; return false; diff --git a/src/mongo/s/commands/cluster_index_filter_cmd.cpp b/src/mongo/s/commands/cluster_index_filter_cmd.cpp index d3465a4bb48..05c5724ee3e 100644 --- a/src/mongo/s/commands/cluster_index_filter_cmd.cpp +++ b/src/mongo/s/commands/cluster_index_filter_cmd.cpp @@ -120,7 +120,7 @@ bool ClusterIndexFilterCmd::run(OperationContext* txn, // Targeted shard commands are generally data-dependent but index filter // commands are tied to query shape (data has no effect on query shape). vector<Strategy::CommandResult> results; - Strategy::commandOp(dbName, cmdObj, options, nss.ns(), BSONObj(), &results); + Strategy::commandOp(txn, dbName, cmdObj, options, nss.ns(), BSONObj(), &results); // Set value of first shard result's "ok" field. bool clusterCmdResult = true; diff --git a/src/mongo/s/commands/cluster_list_databases_cmd.cpp b/src/mongo/s/commands/cluster_list_databases_cmd.cpp index 98c062c9441..213bcc17b1c 100644 --- a/src/mongo/s/commands/cluster_list_databases_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_databases_cmd.cpp @@ -158,11 +158,12 @@ public: bb.append(temp.obj()); } + auto catalogManager = grid.catalogManager(txn); { // get config db from the config servers BSONObjBuilder builder; - if (!grid.catalogManager()->runReadCommand("config", BSON("dbstats" << 1), &builder)) { + if (!catalogManager->runReadCommand("config", BSON("dbstats" << 1), &builder)) { bb.append(BSON("name" << "config")); } else { @@ -183,7 +184,7 @@ public: // get admin db from the config servers BSONObjBuilder builder; - if (!grid.catalogManager()->runReadCommand("admin", BSON("dbstats" << 1), &builder)) { + if (!catalogManager->runReadCommand("admin", BSON("dbstats" << 1), &builder)) { bb.append(BSON("name" << "admin")); } else { diff --git a/src/mongo/s/commands/cluster_list_shards_cmd.cpp b/src/mongo/s/commands/cluster_list_shards_cmd.cpp index 16eccd3c8e1..da852b42ee7 100644 --- a/src/mongo/s/commands/cluster_list_shards_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_shards_cmd.cpp @@ -74,7 +74,7 @@ public: std::string& errmsg, BSONObjBuilder& result) { std::vector<ShardType> shards; - Status status = grid.catalogManager()->getAllShards(&shards); + Status status = grid.catalogManager(txn)->getAllShards(&shards); if (!status.isOK()) { return appendCommandStatus(result, status); } diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index c6ed59b111f..7da7568360a 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -212,7 +212,7 @@ public: } // Ensure the input database exists - auto status = grid.catalogCache()->getDatabase(dbname); + auto status = grid.catalogCache()->getDatabase(txn, dbname); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } @@ -222,7 +222,7 @@ public: shared_ptr<DBConfig> confOut; if (customOutDB) { // Create the output database implicitly, since we have a custom output requested - confOut = uassertStatusOK(grid.implicitCreateDb(outDB)); + confOut = uassertStatusOK(grid.implicitCreateDb(txn, outDB)); } else { confOut = confIn; } @@ -301,7 +301,7 @@ public: // TODO: take distributed lock to prevent split / migration? try { - Strategy::commandOp(dbname, shardedCommand, 0, fullns, q, &mrCommandResults); + Strategy::commandOp(txn, dbname, shardedCommand, 0, fullns, q, &mrCommandResults); } catch (DBException& e) { e.addContext(str::stream() << "could not run map command on all shards for ns " << fullns << " and query " << q); @@ -422,7 +422,7 @@ public: // Create the sharded collection if needed if (!confOut->isSharded(finalColLong)) { // Enable sharding on db - confOut->enableSharding(); + confOut->enableSharding(txn); // Shard collection according to split points vector<BSONObj> sortedSplitPts; @@ -444,7 +444,7 @@ public: BSONObj sortKey = BSON("_id" << 1); ShardKeyPattern sortKeyPattern(sortKey); - Status status = grid.catalogManager()->shardCollection( + Status status = grid.catalogManager(txn)->shardCollection( txn, finalColLong, sortKeyPattern, true, sortedSplitPts, outShardIds); if (!status.isOK()) { return appendCommandStatus(result, status); @@ -454,7 +454,7 @@ public: map<BSONObj, int> chunkSizes; { // Take distributed lock to prevent split / migration. - auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock( + auto scopedDistLock = grid.catalogManager(txn)->getDistLockManager()->lock( finalColLong, "mr-post-process", stdx::chrono::milliseconds(-1), // retry indefinitely @@ -469,7 +469,7 @@ public: try { Strategy::commandOp( - outDB, finalCmdObj, 0, finalColLong, BSONObj(), &mrCommandResults); + txn, outDB, finalCmdObj, 0, finalColLong, BSONObj(), &mrCommandResults); ok = true; } catch (DBException& e) { e.addContext(str::stream() << "could not run final reduce on all shards for " @@ -511,19 +511,19 @@ public: } // Do the splitting round - ChunkManagerPtr cm = confOut->getChunkManagerIfExists(finalColLong); + ChunkManagerPtr cm = confOut->getChunkManagerIfExists(txn, finalColLong); for (const auto& chunkSize : chunkSizes) { BSONObj key = chunkSize.first; const int size = chunkSize.second; invariant(size < std::numeric_limits<int>::max()); // key reported should be the chunk's minimum - ChunkPtr c = cm->findIntersectingChunk(key); + ChunkPtr c = cm->findIntersectingChunk(txn, key); if (!c) { warning() << "Mongod reported " << size << " bytes inserted for key " << key << " but can't find chunk"; } else { - c->splitIfShould(size); + c->splitIfShould(txn, size); } } } diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index d57e2de1cce..ffce75566fc 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -138,7 +138,7 @@ public: result, Status(ErrorCodes::InvalidNamespace, "no namespace specified")); } - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } @@ -151,7 +151,7 @@ public: } // This refreshes the chunk metadata if stale. - ChunkManagerPtr manager = config->getChunkManagerIfExists(nss.ns(), true); + ChunkManagerPtr manager = config->getChunkManagerIfExists(txn, nss.ns(), true); if (!manager) { return appendCommandStatus( result, @@ -170,14 +170,14 @@ public: minKey = manager->getShardKeyPattern().normalizeShardKey(minKey); maxKey = manager->getShardKeyPattern().normalizeShardKey(maxKey); - ChunkPtr firstChunk = manager->findIntersectingChunk(minKey); + ChunkPtr firstChunk = manager->findIntersectingChunk(txn, minKey); verify(firstChunk); BSONObjBuilder remoteCmdObjB; remoteCmdObjB.append(cmdObj[ClusterMergeChunksCommand::nsField()]); remoteCmdObjB.append(cmdObj[ClusterMergeChunksCommand::boundsField()]); remoteCmdObjB.append(ClusterMergeChunksCommand::configField(), - grid.catalogManager()->connectionString().toString()); + grid.catalogManager(txn)->connectionString().toString()); remoteCmdObjB.append(ClusterMergeChunksCommand::shardNameField(), firstChunk->getShardId()); BSONObj remoteResult; diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index 45b9c63cdec..9dc54bb633c 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -116,7 +116,7 @@ public: result, Status(ErrorCodes::InvalidNamespace, "no namespace specified")); } - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } @@ -125,7 +125,7 @@ public: } if (!config->isSharded(nss.ns())) { - config->reload(); + config->reload(txn); if (!config->isSharded(nss.ns())) { return appendCommandStatus(result, @@ -164,7 +164,7 @@ public: } // This refreshes the chunk metadata if stale. - ChunkManagerPtr info = config->getChunkManager(nss.ns(), true); + ChunkManagerPtr info = config->getChunkManager(txn, nss.ns(), true); ChunkPtr chunk; if (!find.isEmpty()) { @@ -181,7 +181,7 @@ public: return false; } - chunk = info->findIntersectingChunk(shardKey); + chunk = info->findIntersectingChunk(txn, shardKey); verify(chunk.get()); } else { // Bounds @@ -197,7 +197,7 @@ public: BSONObj minKey = info->getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); BSONObj maxKey = info->getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); - chunk = info->findIntersectingChunk(minKey); + chunk = info->findIntersectingChunk(txn, minKey); verify(chunk.get()); if (chunk->getMin().woCompare(minKey) != 0 || chunk->getMax().woCompare(maxKey) != 0) { @@ -238,7 +238,8 @@ public: } BSONObj res; - if (!chunk->moveAndCommit(to->getId(), + if (!chunk->moveAndCommit(txn, + to->getId(), maxChunkSizeBytes, writeConcern.get(), cmdObj["_waitForDelete"].trueValue(), diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp index ca28077860a..1a4788d28f0 100644 --- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp @@ -114,7 +114,7 @@ public: // Flush all cached information. This can't be perfect, but it's better than nothing. grid.catalogCache()->invalidate(dbname); - auto status = grid.catalogCache()->getDatabase(dbname); + auto status = grid.catalogCache()->getDatabase(txn, dbname); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } @@ -147,8 +147,9 @@ public: << " to: " << toShard->toString(); string whyMessage(str::stream() << "Moving primary shard of " << dbname); + auto catalogManager = grid.catalogManager(txn); auto scopedDistLock = - grid.catalogManager()->getDistLockManager()->lock(dbname + "-movePrimary", whyMessage); + catalogManager->getDistLockManager()->lock(dbname + "-movePrimary", whyMessage); if (!scopedDistLock.isOK()) { return appendCommandStatus(result, scopedDistLock.getStatus()); @@ -161,7 +162,7 @@ public: BSONObj moveStartDetails = _buildMoveEntry(dbname, fromShard->toString(), toShard->toString(), shardedColls); - grid.catalogManager()->logChange( + catalogManager->logChange( txn->getClient()->clientAddress(true), "movePrimary.start", dbname, moveStartDetails); BSONArrayBuilder barr; @@ -189,7 +190,7 @@ public: ScopedDbConnection fromconn(fromShard->getConnString()); - config->setPrimary(toShard->getConnString().toString()); + config->setPrimary(txn, toShard->getConnString().toString()); if (shardedColls.empty()) { // TODO: Collections can be created in the meantime, and we should handle in the future. @@ -240,7 +241,7 @@ public: BSONObj moveFinishDetails = _buildMoveEntry(dbname, oldPrimary, toShard->toString(), shardedColls); - grid.catalogManager()->logChange( + catalogManager->logChange( txn->getClient()->clientAddress(true), "movePrimary", dbname, moveFinishDetails); return true; } diff --git a/src/mongo/s/commands/cluster_netstat_cmd.cpp b/src/mongo/s/commands/cluster_netstat_cmd.cpp index df4c158a8b7..66a9bc8c036 100644 --- a/src/mongo/s/commands/cluster_netstat_cmd.cpp +++ b/src/mongo/s/commands/cluster_netstat_cmd.cpp @@ -69,7 +69,7 @@ public: int options, std::string& errmsg, BSONObjBuilder& result) { - result.append("configserver", grid.catalogManager()->connectionString().toString()); + result.append("configserver", grid.catalogManager(txn)->connectionString().toString()); result.append("isdbgrid", 1); return true; } diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index da0a5dfcd12..ce272013709 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -102,7 +102,7 @@ public: BSONObjBuilder& result) { const string fullns = parseNs(dbname, cmdObj); - auto status = grid.catalogCache()->getDatabase(dbname); + auto status = grid.catalogCache()->getDatabase(txn, dbname); if (!status.isOK()) { return appendEmptyResultSet(result, status.getStatus(), fullns); } @@ -137,7 +137,7 @@ public: // If the first $match stage is an exact match on the shard key, we only have to send it // to one shard, so send the command to that shard. BSONObj firstMatchQuery = pipeline->getInitialQuery(); - ChunkManagerPtr chunkMgr = conf->getChunkManager(fullns); + ChunkManagerPtr chunkMgr = conf->getChunkManager(txn, fullns); BSONObj shardKeyMatches = uassertStatusOK( chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(firstMatchQuery)); @@ -173,7 +173,8 @@ public: // Run the command on the shards // TODO need to make sure cursors are killed if a retry is needed vector<Strategy::CommandResult> shardResults; - Strategy::commandOp(dbname, shardedCommand, options, fullns, shardQuery, &shardResults); + Strategy::commandOp( + txn, dbname, shardedCommand, options, fullns, shardQuery, &shardResults); if (pipeline->isExplain()) { // This must be checked before we start modifying result. diff --git a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp index 1411f5bc3b1..42f240b7dd3 100644 --- a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp +++ b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp @@ -120,7 +120,7 @@ bool ClusterPlanCacheCmd::run(OperationContext* txn, // Targeted shard commands are generally data-dependent but plan cache // commands are tied to query shape (data has no effect on query shape). vector<Strategy::CommandResult> results; - Strategy::commandOp(dbName, cmdObj, options, nss.ns(), BSONObj(), &results); + Strategy::commandOp(txn, dbName, cmdObj, options, nss.ns(), BSONObj(), &results); // Set value of first shard result's "ok" field. bool clusterCmdResult = true; diff --git a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp index b6cc17b9545..fc958920407 100644 --- a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp +++ b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp @@ -94,14 +94,15 @@ public: return appendCommandStatus(result, Status(ErrorCodes::ShardNotFound, msg)); } + auto catalogManager = grid.catalogManager(txn); StatusWith<ShardDrainingStatus> removeShardResult = - grid.catalogManager()->removeShard(txn, s->getId()); + catalogManager->removeShard(txn, s->getId()); if (!removeShardResult.isOK()) { return appendCommandStatus(result, removeShardResult.getStatus()); } vector<string> databases; - grid.catalogManager()->getDatabasesForShard(s->getId(), &databases); + catalogManager->getDatabasesForShard(s->getId(), &databases); // Get BSONObj containing: // 1) note about moving or dropping databases in a shard @@ -131,10 +132,10 @@ public: break; case ShardDrainingStatus::ONGOING: { vector<ChunkType> chunks; - Status status = grid.catalogManager()->getChunks(BSON(ChunkType::shard(s->getId())), - BSONObj(), - boost::none, // return all - &chunks); + Status status = catalogManager->getChunks(BSON(ChunkType::shard(s->getId())), + BSONObj(), + boost::none, // return all + &chunks); if (!status.isOK()) { return appendCommandStatus(result, status); } diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index 1cde82d9bde..c84946580f4 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -120,7 +120,7 @@ public: Status(ErrorCodes::InvalidNamespace, "invalid collection namespace [" + ns + "]")); } - auto config = uassertStatusOK(grid.catalogCache()->getDatabase(nsStr.db().toString())); + auto config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nsStr.db().toString())); if (!config->isShardingEnabled()) { return appendCommandStatus( result, @@ -324,7 +324,7 @@ public: // 5. If no useful index exists, and collection empty, create one on proposedKey. // Only need to call ensureIndex on primary shard, since indexes get copied to // receiving shard whenever a migrate occurs. - Status status = clusterCreateIndex(ns, proposedKey, careAboutUnique, NULL); + Status status = clusterCreateIndex(txn, ns, proposedKey, careAboutUnique, NULL); if (!status.isOK()) { errmsg = str::stream() << "ensureIndex failed to create index on " << "primary shard: " << status.reason(); @@ -397,7 +397,7 @@ public: audit::logShardCollection(ClientBasic::getCurrent(), ns, proposedKey, careAboutUnique); - Status status = grid.catalogManager()->shardCollection( + Status status = grid.catalogManager(txn)->shardCollection( txn, ns, proposedShardKey, careAboutUnique, initSplits, std::set<ShardId>{}); if (!status.isOK()) { return appendCommandStatus(result, status); @@ -409,7 +409,7 @@ public: if (isHashedShardKey && isEmpty) { // Reload the new config info. If we created more than one initial chunk, then // we need to move them around to balance. - ChunkManagerPtr chunkManager = config->getChunkManager(ns, true); + ChunkManagerPtr chunkManager = config->getChunkManager(txn, ns, true); ChunkMap chunkMap = chunkManager->getChunkMap(); // 2. Move and commit each "big chunk" to a different shard. @@ -431,7 +431,7 @@ public: BSONObj moveResult; WriteConcernOptions noThrottle; if (!chunk->moveAndCommit( - to->getId(), Chunk::MaxChunkSize, &noThrottle, true, 0, moveResult)) { + txn, to->getId(), Chunk::MaxChunkSize, &noThrottle, true, 0, moveResult)) { warning() << "couldn't move chunk " << chunk->toString() << " to shard " << *to << " while sharding collection " << ns << "." << " Reason: " << moveResult; @@ -443,17 +443,17 @@ public: } // Reload the config info, after all the migrations - chunkManager = config->getChunkManager(ns, true); + chunkManager = config->getChunkManager(txn, ns, true); // 3. Subdivide the big chunks by splitting at each of the points in "allSplits" // that we haven't already split by. - ChunkPtr currentChunk = chunkManager->findIntersectingChunk(allSplits[0]); + ChunkPtr currentChunk = chunkManager->findIntersectingChunk(txn, allSplits[0]); vector<BSONObj> subSplits; for (unsigned i = 0; i <= allSplits.size(); i++) { if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) { if (!subSplits.empty()) { - Status status = currentChunk->multiSplit(subSplits, NULL); + Status status = currentChunk->multiSplit(txn, subSplits, NULL); if (!status.isOK()) { warning() << "couldn't split chunk " << currentChunk->toString() << " while sharding collection " << ns << causedBy(status); @@ -463,7 +463,7 @@ public: } if (i < allSplits.size()) { - currentChunk = chunkManager->findIntersectingChunk(allSplits[i]); + currentChunk = chunkManager->findIntersectingChunk(txn, allSplits[i]); } } else { BSONObj splitPoint(allSplits[i]); @@ -479,7 +479,7 @@ public: // Proactively refresh the chunk manager. Not really necessary, but this way it's // immediately up-to-date the next time it's used. - config->getChunkManager(ns, true); + config->getChunkManager(txn, ns, true); } return true; diff --git a/src/mongo/s/commands/cluster_split_collection_cmd.cpp b/src/mongo/s/commands/cluster_split_collection_cmd.cpp index fd5f462518c..06a50c1d3cf 100644 --- a/src/mongo/s/commands/cluster_split_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_split_collection_cmd.cpp @@ -108,14 +108,14 @@ public: result, Status(ErrorCodes::InvalidNamespace, "no namespace specified")); } - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } std::shared_ptr<DBConfig> config = status.getValue(); if (!config->isSharded(nss.ns())) { - config->reload(); + config->reload(txn); if (!config->isSharded(nss.ns())) { return appendCommandStatus(result, @@ -178,7 +178,7 @@ public: } // This refreshes the chunk metadata if stale. - ChunkManagerPtr info = config->getChunkManager(nss.ns(), true); + ChunkManagerPtr info = config->getChunkManager(txn, nss.ns(), true); ChunkPtr chunk; if (!find.isEmpty()) { @@ -195,7 +195,7 @@ public: return false; } - chunk = info->findIntersectingChunk(shardKey); + chunk = info->findIntersectingChunk(txn, shardKey); invariant(chunk.get()); } else if (!bounds.isEmpty()) { if (!info->getShardKeyPattern().isShardKey(bounds[0].Obj()) || @@ -210,7 +210,7 @@ public: BSONObj minKey = info->getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); BSONObj maxKey = info->getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); - chunk = info->findIntersectingChunk(minKey); + chunk = info->findIntersectingChunk(txn, minKey); invariant(chunk.get()); if (chunk->getMin().woCompare(minKey) != 0 || chunk->getMax().woCompare(maxKey) != 0) { @@ -235,7 +235,7 @@ public: return appendCommandStatus(result, status); } - chunk = info->findIntersectingChunk(middle); + chunk = info->findIntersectingChunk(txn, middle); invariant(chunk.get()); if (chunk->getMin().woCompare(middle) == 0 || chunk->getMax().woCompare(middle) == 0) { @@ -252,7 +252,7 @@ public: BSONObj res; if (middle.isEmpty()) { - Status status = chunk->split(Chunk::atMedian, NULL, NULL); + Status status = chunk->split(txn, Chunk::atMedian, NULL, NULL); if (!status.isOK()) { errmsg = "split failed"; result.append("cause", status.toString()); @@ -262,7 +262,7 @@ public: vector<BSONObj> splitPoints; splitPoints.push_back(middle); - Status status = chunk->multiSplit(splitPoints, NULL); + Status status = chunk->multiSplit(txn, splitPoints, NULL); if (!status.isOK()) { errmsg = "split failed"; result.append("cause", status.toString()); diff --git a/src/mongo/s/commands/cluster_user_management_commands.cpp b/src/mongo/s/commands/cluster_user_management_commands.cpp index d2a6ab667d4..7985a305503 100644 --- a/src/mongo/s/commands/cluster_user_management_commands.cpp +++ b/src/mongo/s/commands/cluster_user_management_commands.cpp @@ -78,8 +78,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - return grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + return grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); } virtual void redactForLogging(mutablebson::Document* cmdObj) { @@ -121,8 +121,8 @@ public: if (!status.isOK()) { return appendCommandStatus(result, status); } - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -172,8 +172,8 @@ public: if (!status.isOK()) { return appendCommandStatus(result, status); } - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -212,8 +212,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -260,8 +260,8 @@ public: if (!status.isOK()) { return appendCommandStatus(result, status); } - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -308,8 +308,8 @@ public: if (!status.isOK()) { return appendCommandStatus(result, status); } - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -352,7 +352,7 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - return grid.catalogManager()->runUserManagementReadCommand(dbname, cmdObj, &result); + return grid.catalogManager(txn)->runUserManagementReadCommand(dbname, cmdObj, &result); } } cmdUsersInfo; @@ -385,8 +385,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - return grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + return grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); } } cmdCreateRole; @@ -419,8 +419,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -459,8 +459,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -499,8 +499,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -539,8 +539,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -579,8 +579,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -622,8 +622,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -666,8 +666,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - const bool ok = grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + const bool ok = grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); AuthorizationManager* authzManager = getGlobalAuthorizationManager(); invariant(authzManager); @@ -710,7 +710,7 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - return grid.catalogManager()->runUserManagementReadCommand(dbname, cmdObj, &result); + return grid.catalogManager(txn)->runUserManagementReadCommand(dbname, cmdObj, &result); } } cmdRolesInfo; @@ -797,8 +797,8 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - return grid.catalogManager()->runUserManagementWriteCommand( - this->name, dbname, cmdObj, &result); + return grid.catalogManager(txn) + ->runUserManagementWriteCommand(this->name, dbname, cmdObj, &result); } } cmdMergeAuthzCollections; diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index a39d5f75439..cea3c22307b 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -120,7 +120,7 @@ public: BatchItemRef targetingBatchItem(&request, 0); vector<Strategy::CommandResult> shardResults; Status status = - _commandOpWrite(dbname, explainCmdBob.obj(), targetingBatchItem, &shardResults); + _commandOpWrite(txn, dbname, explainCmdBob.obj(), targetingBatchItem, &shardResults); if (!status.isOK()) { return status; } @@ -153,7 +153,7 @@ public: response.setErrCode(ErrorCodes::FailedToParse); response.setErrMessage(errmsg); } else { - writer.write(request, &response); + writer.write(txn, request, &response); } dassert(response.isValid(NULL)); @@ -222,7 +222,8 @@ private: * * Does *not* retry or retarget if the metadata is stale. */ - static Status _commandOpWrite(const std::string& dbName, + static Status _commandOpWrite(OperationContext* txn, + const std::string& dbName, const BSONObj& command, BatchItemRef targetingBatchItem, std::vector<Strategy::CommandResult>* results) { @@ -231,7 +232,7 @@ private: ChunkManagerTargeter targeter( NamespaceString(targetingBatchItem.getRequest()->getTargetingNS())); - Status status = targeter.init(); + Status status = targeter.init(txn); if (!status.isOK()) return status; @@ -240,17 +241,17 @@ private: if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) { ShardEndpoint* endpoint; - Status status = targeter.targetInsert(targetingBatchItem.getDocument(), &endpoint); + Status status = targeter.targetInsert(txn, targetingBatchItem.getDocument(), &endpoint); if (!status.isOK()) return status; endpoints.push_back(endpoint); } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) { - Status status = targeter.targetUpdate(*targetingBatchItem.getUpdate(), &endpoints); + Status status = targeter.targetUpdate(txn, *targetingBatchItem.getUpdate(), &endpoints); if (!status.isOK()) return status; } else { invariant(targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete); - Status status = targeter.targetDelete(*targetingBatchItem.getDelete(), &endpoints); + Status status = targeter.targetDelete(txn, *targetingBatchItem.getDelete(), &endpoints); if (!status.isOK()) return status; } diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 6bb4d896b43..0e2e18b86b8 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -143,10 +143,13 @@ public: bool implicitCreateDb = false) : RunOnAllShardsCommand(n, oldname, useShardConn, implicitCreateDb) {} - virtual void getShardIds(const string& dbName, BSONObj& cmdObj, vector<ShardId>& shardIds) { + virtual void getShardIds(OperationContext* txn, + const string& dbName, + BSONObj& cmdObj, + vector<ShardId>& shardIds) { const string fullns = dbName + '.' + cmdObj.firstElement().valuestrsafe(); - auto status = grid.catalogCache()->getDatabase(dbName); + auto status = grid.catalogCache()->getDatabase(txn, dbName); uassertStatusOK(status.getStatus()); shared_ptr<DBConfig> conf = status.getValue(); @@ -172,7 +175,7 @@ public: BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); - auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName)); + auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isSharded(fullns)) { return passthrough(conf, cmdObj, options, result); } @@ -397,7 +400,7 @@ public: int, string& errmsg, BSONObjBuilder& result) { - auto status = grid.implicitCreateDb(dbName); + auto status = grid.implicitCreateDb(txn, dbName); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } @@ -426,7 +429,7 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - auto status = grid.catalogCache()->getDatabase(dbName); + auto status = grid.catalogCache()->getDatabase(txn, dbName); if (!status.isOK()) { if (status == ErrorCodes::DatabaseNotFound) { return true; @@ -445,7 +448,7 @@ public: return passthrough(db, cmdObj, result); } - uassertStatusOK(grid.catalogManager()->dropCollection(txn, NamespaceString(fullns))); + uassertStatusOK(grid.catalogManager(txn)->dropCollection(txn, NamespaceString(fullns))); // Force a full reload next time the just dropped namespace is accessed db->invalidateNs(fullns); @@ -474,11 +477,11 @@ public: BSONObjBuilder& result) { const string fullnsFrom = cmdObj.firstElement().valuestrsafe(); const string dbNameFrom = nsToDatabase(fullnsFrom); - auto confFrom = uassertStatusOK(grid.catalogCache()->getDatabase(dbNameFrom)); + auto confFrom = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbNameFrom)); const string fullnsTo = cmdObj["to"].valuestrsafe(); const string dbNameTo = nsToDatabase(fullnsTo); - auto confTo = uassertStatusOK(grid.catalogCache()->getDatabase(dbNameTo)); + auto confTo = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbNameTo)); uassert(13138, "You can't rename a sharded collection", !confFrom->isSharded(fullnsFrom)); uassert(13139, "You can't rename to a sharded collection", !confTo->isSharded(fullnsTo)); @@ -516,7 +519,7 @@ public: uassert(ErrorCodes::EmptyFieldName, "missing todb argument", !todb.empty()); uassert(ErrorCodes::InvalidNamespace, "invalid todb argument", nsIsDbOnly(todb)); - auto confTo = uassertStatusOK(grid.implicitCreateDb(todb)); + auto confTo = uassertStatusOK(grid.implicitCreateDb(txn, todb)); uassert(ErrorCodes::IllegalOperation, "cannot copy to a sharded database", !confTo->isShardingEnabled()); @@ -529,7 +532,7 @@ public: uassert(13399, "need a fromdb argument", !fromdb.empty()); shared_ptr<DBConfig> confFrom = - uassertStatusOK(grid.catalogCache()->getDatabase(fromdb)); + uassertStatusOK(grid.catalogCache()->getDatabase(txn, fromdb)); uassert(13400, "don't know where source DB is", confFrom); uassert(13401, "cant copy from sharded DB", !confFrom->isShardingEnabled()); @@ -572,7 +575,7 @@ public: BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); - auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName)); + auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { result.appendBool("sharded", false); result.append("primary", conf->getPrimaryId()); @@ -582,7 +585,7 @@ public: result.appendBool("sharded", true); - ChunkManagerPtr cm = conf->getChunkManager(fullns); + ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(12594, "how could chunk manager be null!", cm); BSONObjBuilder shardStats; @@ -733,12 +736,12 @@ public: BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); - auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName)); + auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { return passthrough(conf, cmdObj, result); } - ChunkManagerPtr cm = conf->getChunkManager(fullns); + ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(13407, "how could chunk manager be null!", cm); BSONObj min = cmdObj.getObjectField("min"); @@ -841,8 +844,8 @@ public: Timer timer; Strategy::CommandResult singleResult; - Status commandStat = - Strategy::commandOpUnsharded(dbname, explainCmdBob.obj(), 0, fullns, &singleResult); + Status commandStat = Strategy::commandOpUnsharded( + txn, dbname, explainCmdBob.obj(), 0, fullns, &singleResult); if (!commandStat.isOK()) { return commandStat; } @@ -918,7 +921,7 @@ public: BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); - auto status = grid.catalogCache()->getDatabase(dbName); + auto status = grid.catalogCache()->getDatabase(txn, dbName); if (!status.isOK()) { return appendEmptyResultSet(result, status.getStatus(), fullns); } @@ -928,7 +931,7 @@ public: return passthrough(conf, cmdObj, options, result); } - ChunkManagerPtr cm = conf->getChunkManager(fullns); + ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(10420, "how could chunk manager be null!", cm); BSONObj query = getQuery(cmdObj); @@ -997,7 +1000,8 @@ public: Timer timer; vector<Strategy::CommandResult> shardResults; - Strategy::commandOp(dbname, explainCmdBob.obj(), 0, fullns, targetingQuery, &shardResults); + Strategy::commandOp( + txn, dbname, explainCmdBob.obj(), 0, fullns, targetingQuery, &shardResults); long long millisElapsed = timer.millis(); @@ -1037,18 +1041,18 @@ public: BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); - auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName)); + auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { return passthrough(conf, cmdObj, result); } - ChunkManagerPtr cm = conf->getChunkManager(fullns); + ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(13091, "how could chunk manager be null!", cm); if (cm->getShardKeyPattern().toBSON() == BSON("files_id" << 1)) { BSONObj finder = BSON("files_id" << cmdObj.firstElement()); vector<Strategy::CommandResult> results; - Strategy::commandOp(dbName, cmdObj, 0, fullns, finder, &results); + Strategy::commandOp(txn, dbName, cmdObj, 0, fullns, finder, &results); verify(results.size() == 1); // querying on shard key so should only talk to one shard BSONObj res = results.begin()->result; @@ -1080,7 +1084,7 @@ public: vector<Strategy::CommandResult> results; try { - Strategy::commandOp(dbName, shardCmd, 0, fullns, finder, &results); + Strategy::commandOp(txn, dbName, shardCmd, 0, fullns, finder, &results); } catch (DBException& e) { // This is handled below and logged Strategy::CommandResult errResult; @@ -1165,12 +1169,12 @@ public: BSONObjBuilder& result) { const string fullns = parseNs(dbName, cmdObj); - auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName)); + auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); if (!conf->isShardingEnabled() || !conf->isSharded(fullns)) { return passthrough(conf, cmdObj, options, result); } - ChunkManagerPtr cm = conf->getChunkManager(fullns); + ChunkManagerPtr cm = conf->getChunkManager(txn, fullns); massert(13500, "how could chunk manager be null!", cm); BSONObj query = getQuery(cmdObj); @@ -1206,7 +1210,7 @@ public: i != futures.end(); i++) { shared_ptr<Future::CommandResult> res = *i; - if (!res->join()) { + if (!res->join(txn)) { errmsg = res->result()["errmsg"].String(); if (res->result().hasField("code")) { result.append(res->result()["code"]); @@ -1331,7 +1335,7 @@ public: // $eval isn't allowed to access sharded collections, but we need to leave the // shard to detect that. - auto status = grid.catalogCache()->getDatabase(dbName); + auto status = grid.catalogCache()->getDatabase(txn, dbName); if (!status.isOK()) { return appendCommandStatus(result, status.getStatus()); } @@ -1370,7 +1374,7 @@ public: int, string& errmsg, BSONObjBuilder& result) { - auto status = grid.catalogCache()->getDatabase(dbName); + auto status = grid.catalogCache()->getDatabase(txn, dbName); if (!status.isOK()) { return appendEmptyResultSet( result, status.getStatus(), dbName + ".$cmd.listCollections"); @@ -1408,7 +1412,7 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(dbName)); + auto conf = uassertStatusOK(grid.catalogCache()->getDatabase(txn, dbName)); bool retval = passthrough(conf, cmdObj, result); const auto shard = grid.shardRegistry()->getShard(conf->getPrimaryId()); diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp index 4b8d17b4c8c..91b10dd1db0 100644 --- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp +++ b/src/mongo/s/commands/run_on_all_shards_cmd.cpp @@ -62,7 +62,8 @@ BSONObj RunOnAllShardsCommand::specialErrorHandler(const std::string& server, return originalResult; } -void RunOnAllShardsCommand::getShardIds(const std::string& db, +void RunOnAllShardsCommand::getShardIds(OperationContext* txn, + const std::string& db, BSONObj& cmdObj, std::vector<ShardId>& shardIds) { grid.shardRegistry()->getAllShardIds(&shardIds); @@ -77,11 +78,11 @@ bool RunOnAllShardsCommand::run(OperationContext* txn, LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << cmdObj; if (_implicitCreateDb) { - uassertStatusOK(grid.implicitCreateDb(dbName)); + uassertStatusOK(grid.implicitCreateDb(txn, dbName)); } std::vector<ShardId> shardIds; - getShardIds(dbName, cmdObj, shardIds); + getShardIds(txn, dbName, cmdObj, shardIds); std::list<std::shared_ptr<Future::CommandResult>> futures; for (const ShardId& shardId : shardIds) { @@ -108,7 +109,7 @@ bool RunOnAllShardsCommand::run(OperationContext* txn, ++futuresit, ++shardIdsIt) { std::shared_ptr<Future::CommandResult> res = *futuresit; - if (res->join()) { + if (res->join(txn)) { // success :) BSONObj result = res->result(); results.emplace_back(*shardIdsIt, result); diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.h b/src/mongo/s/commands/run_on_all_shards_cmd.h index 149887864a7..b1601f9ef90 100644 --- a/src/mongo/s/commands/run_on_all_shards_cmd.h +++ b/src/mongo/s/commands/run_on_all_shards_cmd.h @@ -81,7 +81,8 @@ public: const BSONObj& originalResult) const; // The default implementation uses all shards. - virtual void getShardIds(const std::string& db, + virtual void getShardIds(OperationContext* txn, + const std::string& db, BSONObj& cmdObj, std::vector<ShardId>& shardIds); diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index 3ae784329a0..f173bfe05bd 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -62,10 +62,10 @@ using std::string; using std::unique_ptr; using std::vector; -CollectionInfo::CollectionInfo(const CollectionType& coll) { +CollectionInfo::CollectionInfo(OperationContext* txn, const CollectionType& coll) { _dropped = coll.getDropped(); - shard(new ChunkManager(coll)); + shard(txn, new ChunkManager(coll)); _dirty = false; } @@ -78,9 +78,9 @@ void CollectionInfo::resetCM(ChunkManager* cm) { _cm.reset(cm); } -void CollectionInfo::shard(ChunkManager* manager) { +void CollectionInfo::shard(OperationContext* txn, ChunkManager* manager) { // Do this *first* so we're invisible to everyone else - manager->loadExistingRanges(nullptr); + manager->loadExistingRanges(txn, nullptr); // // Collections with no chunks are unsharded, no matter what the collections entry says @@ -111,7 +111,7 @@ void CollectionInfo::useChunkManager(ChunkManagerPtr manager) { _dropped = false; } -void CollectionInfo::save(const string& ns) { +void CollectionInfo::save(OperationContext* txn, const string& ns) { CollectionType coll; coll.setNs(NamespaceString{ns}); @@ -130,7 +130,7 @@ void CollectionInfo::save(const string& ns) { coll.setUpdatedAt(Date_t::now()); } - uassertStatusOK(grid.catalogManager()->updateCollection(ns, coll)); + uassertStatusOK(grid.catalogManager(txn)->updateCollection(ns, coll)); _dirty = false; } @@ -177,7 +177,7 @@ void DBConfig::invalidateNs(const std::string& ns) { } } -void DBConfig::enableSharding() { +void DBConfig::enableSharding(OperationContext* txn) { verify(_name != "config"); stdx::lock_guard<stdx::mutex> lk(_lock); @@ -186,10 +186,10 @@ void DBConfig::enableSharding() { } _shardingEnabled = true; - _save(); + _save(txn); } -bool DBConfig::removeSharding(const string& ns) { +bool DBConfig::removeSharding(OperationContext* txn, const string& ns) { if (!_shardingEnabled) { warning() << "could not remove sharding for collection " << ns << ", sharding not enabled for db"; @@ -211,7 +211,7 @@ bool DBConfig::removeSharding(const string& ns) { } ci.unshard(); - _save(false, true); + _save(txn, false, true); return true; } @@ -255,21 +255,23 @@ void DBConfig::getChunkManagerOrPrimary(const string& ns, } -ChunkManagerPtr DBConfig::getChunkManagerIfExists(const string& ns, +ChunkManagerPtr DBConfig::getChunkManagerIfExists(OperationContext* txn, + const string& ns, bool shouldReload, bool forceReload) { // Don't report exceptions here as errors in GetLastError LastError::Disabled ignoreForGLE(&LastError::get(cc())); try { - return getChunkManager(ns, shouldReload, forceReload); + return getChunkManager(txn, ns, shouldReload, forceReload); } catch (AssertionException& e) { warning() << "chunk manager not found for " << ns << causedBy(e); return ChunkManagerPtr(); } } -std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns, +std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* txn, + const string& ns, bool shouldReload, bool forceReload) { BSONObj key; @@ -282,7 +284,7 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns, bool earlyReload = !_collections[ns].isSharded() && (shouldReload || forceReload); if (earlyReload) { // This is to catch cases where there this is a new sharded collection - _load(); + _load(txn); } CollectionInfo& ci = _collections[ns]; @@ -309,7 +311,7 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns, // currently vector<ChunkType> newestChunk; if (oldVersion.isSet() && !forceReload) { - uassertStatusOK(grid.catalogManager()->getChunks( + uassertStatusOK(grid.catalogManager(txn)->getChunks( BSON(ChunkType::ns(ns)), BSON(ChunkType::DEPRECATED_lastmod() << -1), 1, &newestChunk)); if (!newestChunk.empty()) { @@ -356,13 +358,13 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns, tempChunkManager.reset(new ChunkManager( oldManager->getns(), oldManager->getShardKeyPattern(), oldManager->isUnique())); - tempChunkManager->loadExistingRanges(oldManager.get()); + tempChunkManager->loadExistingRanges(txn, oldManager.get()); if (tempChunkManager->numChunks() == 0) { // Maybe we're not sharded any more, so do a full reload - reload(); + reload(txn); - return getChunkManager(ns, false); + return getChunkManager(txn, ns, false); } } @@ -411,21 +413,21 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(const string& ns, return ci.getCM(); } -void DBConfig::setPrimary(const std::string& s) { +void DBConfig::setPrimary(OperationContext* txn, const std::string& s) { const auto shard = grid.shardRegistry()->getShard(s); stdx::lock_guard<stdx::mutex> lk(_lock); _primaryId = shard->getId(); - _save(); + _save(txn); } -bool DBConfig::load() { +bool DBConfig::load(OperationContext* txn) { stdx::lock_guard<stdx::mutex> lk(_lock); - return _load(); + return _load(txn); } -bool DBConfig::_load() { - StatusWith<DatabaseType> status = grid.catalogManager()->getDatabase(_name); +bool DBConfig::_load(OperationContext* txn) { + StatusWith<DatabaseType> status = grid.catalogManager(txn)->getDatabase(_name); if (status == ErrorCodes::DatabaseNotFound) { return false; } @@ -440,7 +442,7 @@ bool DBConfig::_load() { // Load all collections vector<CollectionType> collections; - uassertStatusOK(grid.catalogManager()->getCollections(&_name, &collections)); + uassertStatusOK(grid.catalogManager(txn)->getCollections(&_name, &collections)); int numCollsErased = 0; int numCollsSharded = 0; @@ -450,7 +452,7 @@ bool DBConfig::_load() { _collections.erase(coll.getNs().ns()); numCollsErased++; } else { - _collections[coll.getNs().ns()] = CollectionInfo(coll); + _collections[coll.getNs().ns()] = CollectionInfo(txn, coll); numCollsSharded++; } } @@ -461,14 +463,14 @@ bool DBConfig::_load() { return true; } -void DBConfig::_save(bool db, bool coll) { +void DBConfig::_save(OperationContext* txn, bool db, bool coll) { if (db) { DatabaseType dbt; dbt.setName(_name); dbt.setPrimary(_primaryId); dbt.setSharded(_shardingEnabled); - uassertStatusOK(grid.catalogManager()->updateDatabase(_name, dbt)); + uassertStatusOK(grid.catalogManager(txn)->updateDatabase(_name, dbt)); } if (coll) { @@ -477,17 +479,17 @@ void DBConfig::_save(bool db, bool coll) { continue; } - i->second.save(i->first); + i->second.save(txn, i->first); } } } -bool DBConfig::reload() { +bool DBConfig::reload(OperationContext* txn) { bool successful = false; { stdx::lock_guard<stdx::mutex> lk(_lock); - successful = _load(); + successful = _load(txn); } // If we aren't successful loading the database entry, we don't want to keep the stale @@ -508,14 +510,14 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) { */ log() << "DBConfig::dropDatabase: " << _name; - grid.catalogManager()->logChange( - txn->getClient()->clientAddress(true), "dropDatabase.start", _name, BSONObj()); + grid.catalogManager(txn) + ->logChange(txn->getClient()->clientAddress(true), "dropDatabase.start", _name, BSONObj()); // 1 grid.catalogCache()->invalidate(_name); - Status result = grid.catalogManager()->remove( - DatabaseType::ConfigNS, BSON(DatabaseType::name(_name)), 0, NULL); + Status result = grid.catalogManager(txn) + ->remove(DatabaseType::ConfigNS, BSON(DatabaseType::name(_name)), 0, NULL); if (!result.isOK()) { errmsg = result.reason(); log() << "could not drop '" << _name << "': " << errmsg; @@ -570,8 +572,8 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) { LOG(1) << "\t dropped primary db for: " << _name; - grid.catalogManager()->logChange( - txn->getClient()->clientAddress(true), "dropDatabase", _name, BSONObj()); + grid.catalogManager(txn) + ->logChange(txn->getClient()->clientAddress(true), "dropDatabase", _name, BSONObj()); return true; } @@ -604,11 +606,11 @@ bool DBConfig::_dropShardedCollections(OperationContext* txn, i->second.getCM()->getAllShardIds(&shardIds); - uassertStatusOK(grid.catalogManager()->dropCollection(txn, NamespaceString(i->first))); + uassertStatusOK(grid.catalogManager(txn)->dropCollection(txn, NamespaceString(i->first))); // We should warn, but it's not a fatal error if someone else reloaded the db/coll as // unsharded in the meantime - if (!removeSharding(i->first)) { + if (!removeSharding(txn, i->first)) { warning() << "collection " << i->first << " was reloaded as unsharded before drop completed" << " during drop of all collections"; @@ -648,8 +650,9 @@ void DBConfig::getAllShardedCollections(set<string>& namespaces) { /* --- ConfigServer ---- */ -void ConfigServer::reloadSettings() { - auto chunkSizeResult = grid.catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey); +void ConfigServer::reloadSettings(OperationContext* txn) { + auto catalogManager = grid.catalogManager(txn); + auto chunkSizeResult = catalogManager->getGlobalSettings(SettingsType::ChunkSizeDocKey); if (chunkSizeResult.isOK()) { const int csize = chunkSizeResult.getValue().getChunkSizeMB(); LOG(1) << "Found MaxChunkSize: " << csize; @@ -660,10 +663,10 @@ void ConfigServer::reloadSettings() { } else if (chunkSizeResult.getStatus() == ErrorCodes::NoMatchingDocument) { const int chunkSize = Chunk::MaxChunkSize / (1024 * 1024); Status result = - grid.catalogManager()->insert(SettingsType::ConfigNS, - BSON(SettingsType::key(SettingsType::ChunkSizeDocKey) - << SettingsType::chunkSizeMB(chunkSize)), - NULL); + grid.catalogManager(txn)->insert(SettingsType::ConfigNS, + BSON(SettingsType::key(SettingsType::ChunkSizeDocKey) + << SettingsType::chunkSizeMB(chunkSize)), + NULL); if (!result.isOK()) { warning() << "couldn't set chunkSize on config db" << causedBy(result); } @@ -672,7 +675,8 @@ void ConfigServer::reloadSettings() { } // indexes - Status result = clusterCreateIndex(ChunkType::ConfigNS, + Status result = clusterCreateIndex(txn, + ChunkType::ConfigNS, BSON(ChunkType::ns() << 1 << ChunkType::min() << 1), true, // unique NULL); @@ -682,6 +686,7 @@ void ConfigServer::reloadSettings() { } result = clusterCreateIndex( + txn, ChunkType::ConfigNS, BSON(ChunkType::ns() << 1 << ChunkType::shard() << 1 << ChunkType::min() << 1), true, // unique @@ -691,7 +696,8 @@ void ConfigServer::reloadSettings() { warning() << "couldn't create ns_1_shard_1_min_1 index on config db" << causedBy(result); } - result = clusterCreateIndex(ChunkType::ConfigNS, + result = clusterCreateIndex(txn, + ChunkType::ConfigNS, BSON(ChunkType::ns() << 1 << ChunkType::DEPRECATED_lastmod() << 1), true, // unique NULL); @@ -700,7 +706,8 @@ void ConfigServer::reloadSettings() { warning() << "couldn't create ns_1_lastmod_1 index on config db" << causedBy(result); } - result = clusterCreateIndex(ShardType::ConfigNS, + result = clusterCreateIndex(txn, + ShardType::ConfigNS, BSON(ShardType::host() << 1), true, // unique NULL); @@ -709,7 +716,8 @@ void ConfigServer::reloadSettings() { warning() << "couldn't create host_1 index on config db" << causedBy(result); } - result = clusterCreateIndex(LocksType::ConfigNS, + result = clusterCreateIndex(txn, + LocksType::ConfigNS, BSON(LocksType::lockID() << 1), false, // unique NULL); @@ -718,7 +726,8 @@ void ConfigServer::reloadSettings() { warning() << "couldn't create lock id index on config db" << causedBy(result); } - result = clusterCreateIndex(LocksType::ConfigNS, + result = clusterCreateIndex(txn, + LocksType::ConfigNS, BSON(LocksType::state() << 1 << LocksType::process() << 1), false, // unique NULL); @@ -727,7 +736,8 @@ void ConfigServer::reloadSettings() { warning() << "couldn't create state and process id index on config db" << causedBy(result); } - result = clusterCreateIndex(LockpingsType::ConfigNS, + result = clusterCreateIndex(txn, + LockpingsType::ConfigNS, BSON(LockpingsType::ping() << 1), false, // unique NULL); @@ -736,7 +746,8 @@ void ConfigServer::reloadSettings() { warning() << "couldn't create lockping ping time index on config db" << causedBy(result); } - result = clusterCreateIndex(TagsType::ConfigNS, + result = clusterCreateIndex(txn, + TagsType::ConfigNS, BSON(TagsType::ns() << 1 << TagsType::min() << 1), true, // unique NULL); @@ -749,6 +760,7 @@ void ConfigServer::reloadSettings() { void ConfigServer::replicaSetChange(const string& setName, const string& newConnectionString) { // This is run in it's own thread. Exceptions escaping would result in a call to terminate. Client::initThread("replSetChange"); + auto txn = cc().makeOperationContext(); try { std::shared_ptr<Shard> s = grid.shardRegistry()->lookupRSName(setName); @@ -757,13 +769,13 @@ void ConfigServer::replicaSetChange(const string& setName, const string& newConn return; } - Status result = grid.catalogManager()->update( - ShardType::ConfigNS, - BSON(ShardType::name(s->getId())), - BSON("$set" << BSON(ShardType::host(newConnectionString))), - false, // upsert - false, // multi - NULL); + Status result = grid.catalogManager(txn.get()) + ->update(ShardType::ConfigNS, + BSON(ShardType::name(s->getId())), + BSON("$set" << BSON(ShardType::host(newConnectionString))), + false, // upsert + false, // multi + NULL); if (!result.isOK()) { error() << "RSChangeWatcher: could not update config db for set: " << setName << " to: " << newConnectionString << ": " << result.reason(); diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h index 36d03949933..a779c8ee70c 100644 --- a/src/mongo/s/config.h +++ b/src/mongo/s/config.h @@ -48,7 +48,7 @@ struct CollectionInfo { _dropped = false; } - CollectionInfo(const CollectionType& in); + CollectionInfo(OperationContext* txn, const CollectionType& in); ~CollectionInfo(); bool isSharded() const { @@ -61,7 +61,7 @@ struct CollectionInfo { void resetCM(ChunkManager* cm); - void shard(ChunkManager* cm); + void shard(OperationContext* txn, ChunkManager* cm); void unshard(); @@ -73,7 +73,7 @@ struct CollectionInfo { return _dropped; } - void save(const std::string& ns); + void save(OperationContext* txn, const std::string& ns); void useChunkManager(std::shared_ptr<ChunkManager> manager); @@ -125,12 +125,12 @@ public: */ void invalidateNs(const std::string& ns); - void enableSharding(); + void enableSharding(OperationContext* txn); /** @return true if there was sharding info to remove */ - bool removeSharding(const std::string& ns); + bool removeSharding(OperationContext* txn, const std::string& ns); /** * @return whether or not the 'ns' collection is partitioned @@ -143,10 +143,12 @@ public: std::shared_ptr<ChunkManager>& manager, std::shared_ptr<Shard>& primary); - std::shared_ptr<ChunkManager> getChunkManager(const std::string& ns, + std::shared_ptr<ChunkManager> getChunkManager(OperationContext* txn, + const std::string& ns, bool reload = false, bool forceReload = false); - std::shared_ptr<ChunkManager> getChunkManagerIfExists(const std::string& ns, + std::shared_ptr<ChunkManager> getChunkManagerIfExists(OperationContext* txn, + const std::string& ns, bool reload = false, bool forceReload = false); @@ -155,10 +157,10 @@ public: */ const ShardId& getShardId(const std::string& ns); - void setPrimary(const std::string& s); + void setPrimary(OperationContext* txn, const std::string& s); - bool load(); - bool reload(); + bool load(OperationContext* txn); + bool reload(OperationContext* txn); bool dropDatabase(OperationContext*, std::string& errmsg); @@ -173,9 +175,9 @@ protected: std::set<ShardId>& shardIds, std::string& errmsg); - bool _load(); + bool _load(OperationContext* txn); - void _save(bool db = true, bool coll = true); + void _save(OperationContext* txn, bool db = true, bool coll = true); // Name of the database which this entry caches const std::string _name; @@ -198,7 +200,7 @@ protected: class ConfigServer { public: - static void reloadSettings(); + static void reloadSettings(OperationContext* txn); static void replicaSetChange(const std::string& setName, const std::string& newConnectionString); diff --git a/src/mongo/s/d_merge.cpp b/src/mongo/s/d_merge.cpp index e7ec42da278..bd399b3f832 100644 --- a/src/mongo/s/d_merge.cpp +++ b/src/mongo/s/d_merge.cpp @@ -53,7 +53,8 @@ using std::shared_ptr; using std::string; using mongoutils::str::stream; -static Status runApplyOpsCmd(const std::vector<ChunkType>&, +static Status runApplyOpsCmd(OperationContext* txn, + const std::vector<ChunkType>&, const ChunkVersion&, const ChunkVersion&); @@ -70,7 +71,8 @@ bool mergeChunks(OperationContext* txn, // Get the distributed lock string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to " << maxKey; - auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(nss.ns(), whyMessage); + auto scopedDistLock = + grid.catalogManager(txn)->getDistLockManager()->lock(nss.ns(), whyMessage); if (!scopedDistLock.isOK()) { *errMsg = stream() << "could not acquire collection lock for " << nss.ns() @@ -229,7 +231,7 @@ bool mergeChunks(OperationContext* txn, // // Run apply ops command // - Status applyOpsStatus = runApplyOpsCmd(chunksToMerge, shardVersion, mergeVersion); + Status applyOpsStatus = runApplyOpsCmd(txn, chunksToMerge, shardVersion, mergeVersion); if (!applyOpsStatus.isOK()) { warning() << applyOpsStatus; return false; @@ -253,8 +255,8 @@ bool mergeChunks(OperationContext* txn, BSONObj mergeLogEntry = buildMergeLogEntry(chunksToMerge, shardVersion, mergeVersion); - grid.catalogManager()->logChange( - txn->getClient()->clientAddress(true), "merge", nss.ns(), mergeLogEntry); + grid.catalogManager(txn) + ->logChange(txn->getClient()->clientAddress(true), "merge", nss.ns(), mergeLogEntry); return true; } @@ -330,7 +332,8 @@ BSONArray buildOpPrecond(const string& ns, return preCond.arr(); } -Status runApplyOpsCmd(const std::vector<ChunkType>& chunksToMerge, +Status runApplyOpsCmd(OperationContext* txn, + const std::vector<ChunkType>& chunksToMerge, const ChunkVersion& currShardVersion, const ChunkVersion& newMergedVersion) { BSONArrayBuilder updatesB; @@ -354,6 +357,6 @@ Status runApplyOpsCmd(const std::vector<ChunkType>& chunksToMerge, } BSONArray preCond = buildOpPrecond(firstChunk.getNS(), firstChunk.getShard(), currShardVersion); - return grid.catalogManager()->applyChunkOpsDeprecated(updatesB.arr(), preCond); + return grid.catalogManager(txn)->applyChunkOpsDeprecated(updatesB.arr(), preCond); } } diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 8bf5c089569..f1024978064 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -433,7 +433,7 @@ public: string whyMessage(str::stream() << "migrating chunk [" << minKey << ", " << maxKey << ") in " << ns); - auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(ns, whyMessage); + auto scopedDistLock = grid.catalogManager(txn)->getDistLockManager()->lock(ns, whyMessage); if (!scopedDistLock.isOK()) { errmsg = stream() << "could not acquire collection lock for " << ns @@ -447,8 +447,8 @@ public: BSONObj chunkInfo = BSON("min" << min << "max" << max << "from" << fromShardName << "to" << toShardName); - grid.catalogManager()->logChange( - txn->getClient()->clientAddress(true), "moveChunk.start", ns, chunkInfo); + grid.catalogManager(txn) + ->logChange(txn->getClient()->clientAddress(true), "moveChunk.start", ns, chunkInfo); // Always refresh our metadata remotely ChunkVersion origShardVersion; @@ -578,7 +578,7 @@ public: recvChunkStartBuilder.append("max", max); recvChunkStartBuilder.append("shardKeyPattern", shardKeyPattern); recvChunkStartBuilder.append("configServer", - ShardingState::get(txn)->getConfigServer()); + ShardingState::get(txn)->getConfigServer(txn)); recvChunkStartBuilder.append("secondaryThrottle", isSecondaryThrottle); // Follow the same convention in moveChunk. @@ -898,11 +898,11 @@ public: } applyOpsStatus = - grid.catalogManager()->applyChunkOpsDeprecated(updates.arr(), preCond.arr()); + grid.catalogManager(txn)->applyChunkOpsDeprecated(updates.arr(), preCond.arr()); if (MONGO_FAIL_POINT(failMigrationApplyOps)) { throw SocketException(SocketException::RECV_ERROR, - ShardingState::get(txn)->getConfigServer()); + ShardingState::get(txn)->getConfigServer(txn)); } } catch (const DBException& ex) { warning() << ex << migrateLog; @@ -950,11 +950,11 @@ public: // we assume that if that mod made it to the config, the applyOps was successful try { std::vector<ChunkType> newestChunk; - Status status = grid.catalogManager()->getChunks( - BSON(ChunkType::ns(ns)), - BSON(ChunkType::DEPRECATED_lastmod() << -1), - 1, - &newestChunk); + Status status = grid.catalogManager(txn) + ->getChunks(BSON(ChunkType::ns(ns)), + BSON(ChunkType::DEPRECATED_lastmod() << -1), + 1, + &newestChunk); uassertStatusOK(status); ChunkVersion checkVersion; @@ -989,7 +989,7 @@ public: commitInfo.appendElements(res["counts"].Obj()); } - grid.catalogManager()->logChange( + grid.catalogManager(txn)->logChange( txn->getClient()->clientAddress(true), "moveChunk.commit", ns, commitInfo.obj()); } diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp index 1722899f7ab..0c6dd0f9ff5 100644 --- a/src/mongo/s/d_split.cpp +++ b/src/mongo/s/d_split.cpp @@ -603,7 +603,7 @@ public: // Initialize our current shard name in the shard state if needed shardingState->gotShardName(shardName); - auto configLocStatus = ConnectionString::parse(shardingState->getConfigServer()); + auto configLocStatus = ConnectionString::parse(shardingState->getConfigServer(txn)); if (!configLocStatus.isOK()) { warning() << configLocStatus.getStatus(); return false; @@ -617,7 +617,7 @@ public: string whyMessage(str::stream() << "splitting chunk [" << minKey << ", " << maxKey << ") in " << ns); - auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(ns, whyMessage); + auto scopedDistLock = grid.catalogManager(txn)->getDistLockManager()->lock(ns, whyMessage); if (!scopedDistLock.isOK()) { errmsg = str::stream() << "could not acquire collection lock for " << ns @@ -787,7 +787,7 @@ public: // 4. apply the batch of updates to remote and local metadata // Status applyOpsStatus = - grid.catalogManager()->applyChunkOpsDeprecated(updates.arr(), preCond.arr()); + grid.catalogManager(txn)->applyChunkOpsDeprecated(updates.arr(), preCond.arr()); if (!applyOpsStatus.isOK()) { return appendCommandStatus(result, applyOpsStatus); } @@ -824,8 +824,8 @@ public: appendShortVersion(logDetail.subobjStart("left"), *newChunks[0]); appendShortVersion(logDetail.subobjStart("right"), *newChunks[1]); - grid.catalogManager()->logChange( - txn->getClient()->clientAddress(true), "split", ns, logDetail.obj()); + grid.catalogManager(txn) + ->logChange(txn->getClient()->clientAddress(true), "split", ns, logDetail.obj()); } else { BSONObj beforeDetailObj = logDetail.obj(); BSONObj firstDetailObj = beforeDetailObj.getOwned(); @@ -838,7 +838,7 @@ public: chunkDetail.append("of", newChunksSize); appendShortVersion(chunkDetail.subobjStart("chunk"), *newChunks[i]); - grid.catalogManager()->logChange( + grid.catalogManager(txn)->logChange( txn->getClient()->clientAddress(true), "multi-split", ns, chunkDetail.obj()); } } diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp index 594bef917fe..402df200a22 100644 --- a/src/mongo/s/d_state.cpp +++ b/src/mongo/s/d_state.cpp @@ -150,7 +150,7 @@ public: ShardingState* shardingState = ShardingState::get(txn); if (shardingState->enabled()) { - result.append("configServer", shardingState->getConfigServer()); + result.append("configServer", shardingState->getConfigServer(txn)); } else { result.append("configServer", ""); } @@ -213,7 +213,7 @@ public: Lock::DBLock dbXLock(txn->lockState(), dbname, MODE_X); OldClientContext ctx(txn, dbname); - ShardingState::get(txn)->appendInfo(result); + ShardingState::get(txn)->appendInfo(txn, result); return true; } diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 6c34f735607..db779ff4e17 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -60,16 +60,17 @@ void Grid::init(std::unique_ptr<CatalogManager> catalogManager, _cursorManager = stdx::make_unique<ClusterCursorManager>(); } -StatusWith<std::shared_ptr<DBConfig>> Grid::implicitCreateDb(const std::string& dbName) { - auto status = catalogCache()->getDatabase(dbName); +StatusWith<std::shared_ptr<DBConfig>> Grid::implicitCreateDb(OperationContext* txn, + const std::string& dbName) { + auto status = catalogCache()->getDatabase(txn, dbName); if (status.isOK()) { return status; } if (status == ErrorCodes::DatabaseNotFound) { - auto statusCreateDb = catalogManager()->createDatabase(dbName); + auto statusCreateDb = catalogManager(txn)->createDatabase(dbName); if (statusCreateDb.isOK() || statusCreateDb == ErrorCodes::NamespaceExists) { - return catalogCache()->getDatabase(dbName); + return catalogCache()->getDatabase(txn, dbName); } return statusCreateDb; @@ -103,8 +104,9 @@ bool Grid::shouldBalance(const SettingsType& balancerSettings) const { return true; } -bool Grid::getConfigShouldBalance() const { - auto balSettingsResult = grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey); +bool Grid::getConfigShouldBalance(OperationContext* txn) const { + auto balSettingsResult = + grid.catalogManager(txn)->getGlobalSettings(SettingsType::BalancerDocKey); if (!balSettingsResult.isOK()) { warning() << balSettingsResult.getStatus(); return false; diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 7136e69b5c8..430ac7a29f6 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -40,6 +40,7 @@ class BSONObj; class CatalogCache; class CatalogManager; class DBConfig; +class OperationContext; class SettingsType; class ShardRegistry; template <typename T> @@ -68,7 +69,8 @@ public: /** * Implicitly creates the specified database as non-sharded. */ - StatusWith<std::shared_ptr<DBConfig>> implicitCreateDb(const std::string& dbName); + StatusWith<std::shared_ptr<DBConfig>> implicitCreateDb(OperationContext* txn, + const std::string& dbName); /** * @return true if shards and config servers are allowed to use 'localhost' in address @@ -89,11 +91,12 @@ public: /** * Returns true if the config server settings indicate that the balancer should be active. */ - bool getConfigShouldBalance() const; + bool getConfigShouldBalance(OperationContext* txn) const; - CatalogManager* catalogManager() { + CatalogManager* catalogManager(OperationContext* txn) { return _catalogManager.get(); } + CatalogCache* catalogCache() { return _catalogCache.get(); } diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h index d450899031a..44bc5be18c2 100644 --- a/src/mongo/s/mock_ns_targeter.h +++ b/src/mongo/s/mock_ns_targeter.h @@ -85,7 +85,7 @@ public: /** * Returns a ShardEndpoint for the doc from the mock ranges */ - Status targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const { + Status targetInsert(OperationContext* txn, const BSONObj& doc, ShardEndpoint** endpoint) const { std::vector<ShardEndpoint*> endpoints; Status status = targetQuery(doc, &endpoints); if (!status.isOK()) @@ -99,7 +99,8 @@ public: * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle * queries of the form { field : { $gte : <value>, $lt : <value> } }. */ - Status targetUpdate(const BatchedUpdateDocument& updateDoc, + Status targetUpdate(OperationContext* txn, + const BatchedUpdateDocument& updateDoc, std::vector<ShardEndpoint*>* endpoints) const { return targetQuery(updateDoc.getQuery(), endpoints); } @@ -108,7 +109,8 @@ public: * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle * queries of the form { field : { $gte : <value>, $lt : <value> } }. */ - Status targetDelete(const BatchedDeleteDocument& deleteDoc, + Status targetDelete(OperationContext* txn, + const BatchedDeleteDocument& deleteDoc, std::vector<ShardEndpoint*>* endpoints) const { return targetQuery(deleteDoc.getQuery(), endpoints); } @@ -138,7 +140,7 @@ public: // No-op } - Status refreshIfNeeded(bool* wasChanged) { + Status refreshIfNeeded(OperationContext* txn, bool* wasChanged) { // No-op if (wasChanged) *wasChanged = false; diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index 0b8e513f7fa..7aed94536ea 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -40,6 +40,7 @@ namespace mongo { +class OperationContext; struct ShardEndpoint; /** @@ -81,14 +82,17 @@ public: * * Returns !OK with message if document could not be targeted for other reasons. */ - virtual Status targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const = 0; + virtual Status targetInsert(OperationContext* txn, + const BSONObj& doc, + ShardEndpoint** endpoint) const = 0; /** * Returns a vector of ShardEndpoints for a potentially multi-shard update. * * Returns OK and fills the endpoints; returns a status describing the error otherwise. */ - virtual Status targetUpdate(const BatchedUpdateDocument& updateDoc, + virtual Status targetUpdate(OperationContext* txn, + const BatchedUpdateDocument& updateDoc, std::vector<ShardEndpoint*>* endpoints) const = 0; /** @@ -96,7 +100,8 @@ public: * * Returns OK and fills the endpoints; returns a status describing the error otherwise. */ - virtual Status targetDelete(const BatchedDeleteDocument& deleteDoc, + virtual Status targetDelete(OperationContext* txn, + const BatchedDeleteDocument& deleteDoc, std::vector<ShardEndpoint*>* endpoints) const = 0; /** @@ -141,7 +146,7 @@ public: * NOTE: This function may block for shared resources or network calls. * Returns !OK with message if could not refresh */ - virtual Status refreshIfNeeded(bool* wasChanged) = 0; + virtual Status refreshIfNeeded(OperationContext* txn, bool* wasChanged) = 0; }; /** diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 0daabd89073..d3f03681a1a 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -200,7 +200,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn, std::vector<BSONObj>* results) { invariant(results); - auto dbConfig = grid.catalogCache()->getDatabase(query.nss().db().toString()); + auto dbConfig = grid.catalogCache()->getDatabase(txn, query.nss().db().toString()); if (!dbConfig.isOK()) { return dbConfig.getStatus(); } @@ -229,7 +229,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn, << retries << " of " << kMaxStaleConfigRetries << ": " << status.reason(); invariant(chunkManager); - chunkManager = chunkManager->reload(); + chunkManager = chunkManager->reload(txn); } return {ErrorCodes::StaleShardVersion, diff --git a/src/mongo/s/request.cpp b/src/mongo/s/request.cpp index 4e5b5626730..1ea079f3a43 100644 --- a/src/mongo/s/request.cpp +++ b/src/mongo/s/request.cpp @@ -80,7 +80,7 @@ void Request::init() { _didInit = true; } -void Request::process(int attempt) { +void Request::process(OperationContext* txn, int attempt) { init(); int op = _m.operation(); verify(op > dbMsg); @@ -108,15 +108,15 @@ void Request::process(int attempt) { << ") for $cmd type ns - can only be 1 or -1", n == 1 || n == -1); - Strategy::clientCommandOp(*this); + Strategy::clientCommandOp(txn, *this); } else { - Strategy::queryOp(*this); + Strategy::queryOp(txn, *this); } } else if (op == dbGetMore) { - Strategy::getMore(*this); + Strategy::getMore(txn, *this); globalOpCounters.gotOp(op, iscmd); } else { - Strategy::writeOp(op, *this); + Strategy::writeOp(txn, op, *this); // globalOpCounters are handled by write commands. } diff --git a/src/mongo/s/request.h b/src/mongo/s/request.h index cee65b0dce2..6292610f971 100644 --- a/src/mongo/s/request.h +++ b/src/mongo/s/request.h @@ -35,6 +35,7 @@ namespace mongo { class Client; +class OperationContext; class Request { MONGO_DISALLOW_COPYING(Request); @@ -72,7 +73,7 @@ public: return _p; } - void process(int attempt = 0); + void process(OperationContext* txn, int attempt = 0); void init(); diff --git a/src/mongo/s/s_only.cpp b/src/mongo/s/s_only.cpp index 661bc785fe9..275ca7ae3d2 100644 --- a/src/mongo/s/s_only.cpp +++ b/src/mongo/s/s_only.cpp @@ -144,7 +144,8 @@ void Command::execCommandClientBasic(OperationContext* txn, appendCommandStatus(result, ok, errmsg); } -void Command::runAgainstRegistered(const char* ns, +void Command::runAgainstRegistered(OperationContext* txn, + const char* ns, BSONObj& jsobj, BSONObjBuilder& anObjBuilder, int queryOptions) { @@ -165,8 +166,7 @@ void Command::runAgainstRegistered(const char* ns, return; } - auto txn = cc().makeOperationContext(); - execCommandClientBasic(txn.get(), c, cc(), queryOptions, ns, jsobj, anObjBuilder); + execCommandClientBasic(txn, c, cc(), queryOptions, ns, jsobj, anObjBuilder); } void Command::registerError(OperationContext* txn, const DBException& exception) {} diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index d74f4823aa7..7467d763ec8 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -130,10 +130,11 @@ public: virtual void process(Message& m, AbstractMessagingPort* p) { verify(p); Request r(m, p); + auto txn = cc().makeOperationContext(); try { r.init(); - r.process(); + r.process(txn.get()); } catch (const AssertionException& ex) { LOG(ex.isUserAssertion() ? 1 : 0) << "Assertion failed" << " while processing " << opToString(m.operation()) @@ -169,8 +170,13 @@ public: void start(const MessageServer::Options& opts) { balancer.go(); cursorCache.startTimeoutThread(); + UserCacheInvalidator cacheInvalidatorThread(getGlobalAuthorizationManager()); - cacheInvalidatorThread.go(); + { + auto txn = cc().makeOperationContext(); + cacheInvalidatorThread.initialize(txn.get()); + cacheInvalidatorThread.go(); + } PeriodicTask::startRunningPeriodicTasks(); @@ -190,13 +196,14 @@ DBClientBase* createDirectClient(OperationContext* txn) { using namespace mongo; -static Status initializeSharding(bool doUpgrade) { +static Status initializeSharding(OperationContext* txn, bool doUpgrade) { Status status = initializeGlobalShardingState(mongosGlobalParams.configdbs); if (!status.isOK()) { return status; } - status = grid.catalogManager()->checkAndUpgrade(!doUpgrade); + auto catalogManager = grid.catalogManager(txn); + status = catalogManager->checkAndUpgrade(!doUpgrade); if (!status.isOK()) { return status; } @@ -205,7 +212,7 @@ static Status initializeSharding(bool doUpgrade) { return Status::OK(); } - status = grid.catalogManager()->startup(); + status = catalogManager->startup(); if (!status.isOK()) { return status; } @@ -231,16 +238,19 @@ static ExitCode runMongosServer(bool doUpgrade) { dbexit(EXIT_BADOPTIONS); } - Status status = initializeSharding(doUpgrade); - if (!status.isOK()) { - error() << "Error initializing sharding system: " << status; - return EXIT_SHARDING_ERROR; - } - if (doUpgrade) { - return EXIT_CLEAN; - } + { + auto txn = cc().makeOperationContext(); + Status status = initializeSharding(txn.get(), doUpgrade); + if (!status.isOK()) { + error() << "Error initializing sharding system: " << status; + return EXIT_SHARDING_ERROR; + } + if (doUpgrade) { + return EXIT_CLEAN; + } - ConfigServer::reloadSettings(); + ConfigServer::reloadSettings(txn.get()); + } #if !defined(_WIN32) mongo::signalForkSuccess(); @@ -255,7 +265,7 @@ static ExitCode runMongosServer(bool doUpgrade) { web.detach(); } - status = getGlobalAuthorizationManager()->initialize(NULL); + Status status = getGlobalAuthorizationManager()->initialize(NULL); if (!status.isOK()) { error() << "Initializing authorization data failed: " << status; return EXIT_SHARDING_ERROR; @@ -434,7 +444,19 @@ void mongo::signalShutdown() { void mongo::exitCleanly(ExitCode code) { // TODO: do we need to add anything? - grid.catalogManager()->shutDown(); + { + Client& client = cc(); + ServiceContext::UniqueOperationContext uniqueTxn; + OperationContext* txn = client.getOperationContext(); + if (!txn) { + uniqueTxn = client.makeOperationContext(); + txn = uniqueTxn.get(); + } + + auto catalogMgr = grid.catalogManager(txn); + catalogMgr->shutDown(); + } + mongo::dbexit(code); } diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 817a5d00682..c3475ad36f7 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -87,7 +87,7 @@ static bool _isSystemIndexes(const char* ns) { /** * Returns true if request is a query for sharded indexes. */ -static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) { +static bool doShardedIndexQuery(OperationContext* txn, Request& r, const QuerySpec& qSpec) { // Extract the ns field from the query, which may be embedded within the "query" or // "$query" field. auto nsField = qSpec.filter()["ns"]; @@ -96,7 +96,7 @@ static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) { } const NamespaceString indexNSSQuery(nsField.str()); - auto status = grid.catalogCache()->getDatabase(indexNSSQuery.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, indexNSSQuery.db().toString()); if (!status.isOK()) { return false; } @@ -147,7 +147,7 @@ static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) { return true; } -void Strategy::queryOp(Request& r) { +void Strategy::queryOp(OperationContext* txn, Request& r) { verify(!NamespaceString(r.getns()).isCommand()); Timer queryTimer; @@ -157,7 +157,7 @@ void Strategy::queryOp(Request& r) { QueryMessage q(r.d()); NamespaceString ns(q.ns); - ClientBasic* client = ClientBasic::getCurrent(); + ClientBasic* client = txn->getClient(); AuthorizationSession* authSession = AuthorizationSession::get(client); Status status = authSession->checkAuthForQuery(ns, q.query); audit::logQueryAuthzCheck(client, ns, q.query, status.code()); @@ -237,7 +237,7 @@ void Strategy::queryOp(Request& r) { StatusWith<int> maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery(q.query); uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK()); - if (_isSystemIndexes(q.ns) && doShardedIndexQuery(r, qSpec)) { + if (_isSystemIndexes(q.ns) && doShardedIndexQuery(txn, r, qSpec)) { return; } @@ -246,7 +246,7 @@ void Strategy::queryOp(Request& r) { // TODO: Move out to Request itself, not strategy based try { - cursor->init(); + cursor->init(txn); if (qSpec.isExplain()) { BSONObjBuilder explain_builder; @@ -317,7 +317,7 @@ void Strategy::queryOp(Request& r) { } } -void Strategy::clientCommandOp(Request& r) { +void Strategy::clientCommandOp(OperationContext* txn, Request& r) { QueryMessage q(r.d()); LOG(3) << "command: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn @@ -333,7 +333,7 @@ void Strategy::clientCommandOp(Request& r) { // Regular queries are handled in strategy_shard.cpp verify(nss.isCommand() || nss.isSpecialCommand()); - if (handleSpecialNamespaces(r, q)) + if (handleSpecialNamespaces(txn, r, q)) return; int loops = 5; @@ -367,7 +367,7 @@ void Strategy::clientCommandOp(Request& r) { } } - Command::runAgainstRegistered(q.ns, cmdObj, builder, q.queryOptions); + Command::runAgainstRegistered(txn, q.ns, cmdObj, builder, q.queryOptions); BSONObj x = builder.done(); replyToQuery(0, r.p(), r.m(), x); return; @@ -383,9 +383,9 @@ void Strategy::clientCommandOp(Request& r) { if (staleNS.size() == 0) staleNS = q.ns; - ShardConnection::checkMyConnectionVersions(staleNS); + ShardConnection::checkMyConnectionVersions(txn, staleNS); if (loops < 4) - versionManager.forceRemoteCheckShardVersionCB(staleNS); + versionManager.forceRemoteCheckShardVersionCB(txn, staleNS); } catch (AssertionException& e) { Command::appendCommandStatus(builder, e.toStatus()); BSONObj x = builder.done(); @@ -396,7 +396,7 @@ void Strategy::clientCommandOp(Request& r) { } // TODO: remove after MongoDB 3.2 -bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) { +bool Strategy::handleSpecialNamespaces(OperationContext* txn, Request& r, QueryMessage& q) { const char* ns = strstr(r.getns(), ".$cmd.sys."); if (!ns) return false; @@ -404,7 +404,7 @@ bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) { BSONObjBuilder reply; - const auto upgradeToRealCommand = [&r, &q, &reply](StringData commandName) { + const auto upgradeToRealCommand = [txn, &r, &q, &reply](StringData commandName) { BSONObjBuilder cmdBob; cmdBob.append(commandName, 1); cmdBob.appendElements(q.query); // fields are validated by Commands @@ -412,7 +412,7 @@ bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) { NamespaceString nss(r.getns()); NamespaceString interposedNss(nss.db(), "$cmd"); Command::runAgainstRegistered( - interposedNss.ns().c_str(), interposedCmd, reply, q.queryOptions); + txn, interposedNss.ns().c_str(), interposedCmd, reply, q.queryOptions); }; if (strcmp(ns, "inprog") == 0) { @@ -431,7 +431,8 @@ bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) { return true; } -void Strategy::commandOp(const string& db, +void Strategy::commandOp(OperationContext* txn, + const string& db, const BSONObj& command, int options, const string& versionedNS, @@ -442,7 +443,7 @@ void Strategy::commandOp(const string& db, ParallelSortClusteredCursor cursor(qSpec, CommandInfo(versionedNS, targetingQuery)); // Initialize the cursor - cursor.init(); + cursor.init(txn); set<ShardId> shardIds; cursor.getQueryShardIds(shardIds); @@ -458,14 +459,15 @@ void Strategy::commandOp(const string& db, } } -Status Strategy::commandOpUnsharded(const std::string& db, +Status Strategy::commandOpUnsharded(OperationContext* txn, + const std::string& db, const BSONObj& command, int options, const std::string& versionedNS, CommandResult* cmdResult) { // Note that this implementation will not handle targeting retries and fails when the // sharding metadata is too stale - auto status = grid.catalogCache()->getDatabase(db); + auto status = grid.catalogCache()->getDatabase(txn, db); if (!status.isOK()) { mongoutils::str::stream ss; ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS @@ -507,7 +509,7 @@ Status Strategy::commandOpUnsharded(const std::string& db, return Status::OK(); } -void Strategy::getMore(Request& r) { +void Strategy::getMore(OperationContext* txn, Request& r) { Timer getMoreTimer; const char* ns = r.getns(); @@ -517,7 +519,7 @@ void Strategy::getMore(Request& r) { // TODO: Handle stale config exceptions here from coll being dropped or sharded during op // for now has same semantics as legacy request const NamespaceString nss(ns); - auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString()); + auto statusGetDb = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (statusGetDb == ErrorCodes::DatabaseNotFound) { cursorCache.remove(id); replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0); @@ -619,7 +621,7 @@ void Strategy::getMore(Request& r) { } } -void Strategy::writeOp(int op, Request& r) { +void Strategy::writeOp(OperationContext* txn, int op, Request& r) { // make sure we have a last error dassert(&LastError::get(cc())); @@ -648,7 +650,7 @@ void Strategy::writeOp(int op, Request& r) { { // Disable the last error object for the duration of the write cmd LastError::Disabled disableLastError(&LastError::get(cc())); - Command::runAgainstRegistered(cmdNS.c_str(), requestBSON, builder, 0); + Command::runAgainstRegistered(txn, cmdNS.c_str(), requestBSON, builder, 0); } BatchedCommandResponse response; diff --git a/src/mongo/s/strategy.h b/src/mongo/s/strategy.h index 40d71f2fd4d..d815f4af955 100644 --- a/src/mongo/s/strategy.h +++ b/src/mongo/s/strategy.h @@ -34,16 +34,18 @@ namespace mongo { +class OperationContext; + /** * Legacy interface for processing client read/write/cmd requests. */ class Strategy { public: - static void queryOp(Request& r); + static void queryOp(OperationContext* txn, Request& r); - static void getMore(Request& r); + static void getMore(OperationContext* txn, Request& r); - static void writeOp(int op, Request& r); + static void writeOp(OperationContext* txn, int op, Request& r); struct CommandResult { ShardId shardTargetId; @@ -60,7 +62,8 @@ public: * TODO: Replace these methods and all other methods of command dispatch with a more general * command op framework. */ - static void commandOp(const std::string& db, + static void commandOp(OperationContext* txn, + const std::string& db, const BSONObj& command, int options, const std::string& versionedNS, @@ -76,7 +79,8 @@ public: * On success, fills in 'shardResult' with output from the namespace's primary shard. This * output may itself indicate an error status on the shard. */ - static Status commandOpUnsharded(const std::string& db, + static Status commandOpUnsharded(OperationContext* txn, + const std::string& db, const BSONObj& command, int options, const std::string& versionedNS, @@ -87,10 +91,10 @@ public: * * DEPRECATED: should not be used by new code. */ - static void clientCommandOp(Request& r); + static void clientCommandOp(OperationContext* txn, Request& r); protected: - static bool handleSpecialNamespaces(Request& r, QueryMessage& q); + static bool handleSpecialNamespaces(OperationContext* txn, Request& r, QueryMessage& q); }; } // namespace mongo diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index 8faed905b60..6ba65451d4e 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -187,7 +187,7 @@ DBClientBase* getVersionable(DBClientBase* conn) { * Eventually this should go completely away, but for now many commands rely on unversioned but * mongos-specific behavior on mongod (auditing and replication information in commands) */ -bool initShardVersionEmptyNS(DBClientBase* conn_in) { +bool initShardVersionEmptyNS(OperationContext* txn, DBClientBase* conn_in) { bool ok; BSONObj result; DBClientBase* conn = NULL; @@ -214,7 +214,7 @@ bool initShardVersionEmptyNS(DBClientBase* conn_in) { ok = setShardVersion(*conn, "", - grid.catalogManager()->connectionString().toString(), + grid.catalogManager(txn)->connectionString().toString(), ChunkVersion(), NULL, true, @@ -273,7 +273,8 @@ bool initShardVersionEmptyNS(DBClientBase* conn_in) { * * @return true if we contacted the remote host */ -bool checkShardVersion(DBClientBase* conn_in, +bool checkShardVersion(OperationContext* txn, + DBClientBase* conn_in, const string& ns, ChunkManagerPtr refManager, bool authoritative, @@ -282,10 +283,10 @@ bool checkShardVersion(DBClientBase* conn_in, // Empty namespaces are special - we require initialization but not versioning if (ns.size() == 0) { - return initShardVersionEmptyNS(conn_in); + return initShardVersionEmptyNS(txn, conn_in); } - auto status = grid.catalogCache()->getDatabase(nsToDatabase(ns)); + auto status = grid.catalogCache()->getDatabase(txn, nsToDatabase(ns)); if (!status.isOK()) { return false; } @@ -300,7 +301,7 @@ bool checkShardVersion(DBClientBase* conn_in, ShardPtr primary; ChunkManagerPtr manager; if (authoritative) - conf->getChunkManagerIfExists(ns, true); + conf->getChunkManagerIfExists(txn, ns, true); conf->getChunkManagerOrPrimary(ns, manager, primary); @@ -372,7 +373,7 @@ bool checkShardVersion(DBClientBase* conn_in, BSONObj result; if (setShardVersion(*conn, ns, - grid.catalogManager()->connectionString().toString(), + grid.catalogManager(txn)->connectionString().toString(), version, manager.get(), authoritative, @@ -390,7 +391,7 @@ bool checkShardVersion(DBClientBase* conn_in, if (!authoritative) { // use the original connection and get a fresh versionable connection // since conn can be invalidated (or worse, freed) after the failure - checkShardVersion(conn_in, ns, refManager, 1, tryNumber + 1); + checkShardVersion(txn, conn_in, ns, refManager, 1, tryNumber + 1); return true; } @@ -400,10 +401,10 @@ bool checkShardVersion(DBClientBase* conn_in, << ", connection state indicates significant version changes"; // reload db - conf->reload(); + conf->reload(txn); } else { // reload config - conf->getChunkManager(ns, true); + conf->getChunkManager(txn, ns, true); } } @@ -414,7 +415,7 @@ bool checkShardVersion(DBClientBase* conn_in, sleepmillis(10 * tryNumber); // use the original connection and get a fresh versionable connection // since conn can be invalidated (or worse, freed) after the failure - checkShardVersion(conn_in, ns, refManager, true, tryNumber + 1); + checkShardVersion(txn, conn_in, ns, refManager, true, tryNumber + 1); return true; } @@ -443,13 +444,13 @@ bool VersionManager::isVersionableCB(DBClientBase* conn) { return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET; } -bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) { +bool VersionManager::forceRemoteCheckShardVersionCB(OperationContext* txn, const string& ns) { const NamespaceString nss(ns); // This will force the database catalog entry to be reloaded grid.catalogCache()->invalidate(nss.db().toString()); - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { return false; } @@ -461,7 +462,7 @@ bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) { return false; } - ChunkManagerPtr manager = conf->getChunkManagerIfExists(ns, true, true); + ChunkManagerPtr manager = conf->getChunkManagerIfExists(txn, ns, true, true); if (!manager) { return false; } @@ -469,18 +470,20 @@ bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) { return true; } -bool VersionManager::checkShardVersionCB(DBClientBase* conn_in, +bool VersionManager::checkShardVersionCB(OperationContext* txn, + DBClientBase* conn_in, const string& ns, bool authoritative, int tryNumber) { - return checkShardVersion(conn_in, ns, nullptr, authoritative, tryNumber); + return checkShardVersion(txn, conn_in, ns, nullptr, authoritative, tryNumber); } -bool VersionManager::checkShardVersionCB(ShardConnection* conn_in, +bool VersionManager::checkShardVersionCB(OperationContext* txn, + ShardConnection* conn_in, bool authoritative, int tryNumber) { return checkShardVersion( - conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber); + txn, conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber); } } // namespace mongo diff --git a/src/mongo/s/version_manager.h b/src/mongo/s/version_manager.h index 6dc6877d574..f03fb4f34c8 100644 --- a/src/mongo/s/version_manager.h +++ b/src/mongo/s/version_manager.h @@ -33,6 +33,7 @@ namespace mongo { class DBClientBase; +class OperationContext; class ShardConnection; @@ -41,9 +42,9 @@ public: VersionManager() {} bool isVersionableCB(DBClientBase*); - bool forceRemoteCheckShardVersionCB(const std::string&); - bool checkShardVersionCB(DBClientBase*, const std::string&, bool, int); - bool checkShardVersionCB(ShardConnection*, bool, int); + bool forceRemoteCheckShardVersionCB(OperationContext* txn, const std::string&); + bool checkShardVersionCB(OperationContext*, DBClientBase*, const std::string&, bool, int); + bool checkShardVersionCB(OperationContext*, ShardConnection*, bool, int); void resetShardVersionCB(DBClientBase*); }; diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 1603886e2ef..d2876b4e362 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -93,7 +93,8 @@ static bool isShardMetadataChanging(const vector<ShardError*>& staleErrors) { // This only applies when no writes are occurring and metadata is not changing on reload static const int kMaxRoundsWithoutProgress(5); -void BatchWriteExec::executeBatch(const BatchedCommandRequest& clientRequest, +void BatchWriteExec::executeBatch(OperationContext* txn, + const BatchedCommandRequest& clientRequest, BatchedCommandResponse* clientResponse) { LOG(4) << "starting execution of write batch of size " << static_cast<int>(clientRequest.sizeWriteOps()) << " for " << clientRequest.getNS() @@ -139,7 +140,8 @@ void BatchWriteExec::executeBatch(const BatchedCommandRequest& clientRequest, // If we've already had a targeting error, we've refreshed the metadata once and can // record target errors definitively. bool recordTargetErrors = refreshedTargeter; - Status targetStatus = batchOp.targetBatch(*_targeter, recordTargetErrors, &childBatches); + Status targetStatus = + batchOp.targetBatch(txn, *_targeter, recordTargetErrors, &childBatches); if (!targetStatus.isOK()) { // Don't do anything until a targeter refresh _targeter->noteCouldNotTarget(); @@ -316,7 +318,7 @@ void BatchWriteExec::executeBatch(const BatchedCommandRequest& clientRequest, // bool targeterChanged = false; - Status refreshStatus = _targeter->refreshIfNeeded(&targeterChanged); + Status refreshStatus = _targeter->refreshIfNeeded(txn, &targeterChanged); if (!refreshStatus.isOK()) { // It's okay if we can't refresh, we'll just record errors for the ops if diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h index 708b9481aef..f4d869d3a20 100644 --- a/src/mongo/s/write_ops/batch_write_exec.h +++ b/src/mongo/s/write_ops/batch_write_exec.h @@ -43,6 +43,7 @@ namespace mongo { class BatchWriteExecStats; class MultiCommandDispatch; +class OperationContext; /** * The BatchWriteExec is able to execute client batch write requests, resulting in a batch @@ -71,7 +72,8 @@ public: * * This function does not throw, any errors are reported via the clientResponse. */ - void executeBatch(const BatchedCommandRequest& clientRequest, + void executeBatch(OperationContext* txn, + const BatchedCommandRequest& clientRequest, BatchedCommandResponse* clientResponse); const BatchWriteExecStats& getStats(); diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 435ebe113cd..f1e654c5a33 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -30,6 +30,7 @@ #include "mongo/base/owned_pointer_vector.h" +#include "mongo/db/operation_context_noop.h" #include "mongo/s/client/mock_multi_write_command.h" #include "mongo/s/mock_ns_targeter.h" #include "mongo/s/mock_shard_resolver.h" @@ -88,6 +89,7 @@ TEST(BatchWriteExecTests, SingleOp) { // Basic execution test // + OperationContextNoop txn; NamespaceString nss("foo.bar"); MockSingleShardBackend backend(nss); @@ -100,7 +102,7 @@ TEST(BatchWriteExecTests, SingleOp) { request.getInsertRequest()->addToDocuments(BSON("x" << 1)); BatchedCommandResponse response; - backend.exec->executeBatch(request, &response); + backend.exec->executeBatch(&txn, request, &response); ASSERT(response.getOk()); const BatchWriteExecStats& stats = backend.exec->getStats(); @@ -112,6 +114,7 @@ TEST(BatchWriteExecTests, SingleOpError) { // Basic error test // + OperationContextNoop txn; NamespaceString nss("foo.bar"); MockSingleShardBackend backend(nss); @@ -133,7 +136,7 @@ TEST(BatchWriteExecTests, SingleOpError) { request.getInsertRequest()->addToDocuments(BSON("x" << 1)); BatchedCommandResponse response; - backend.exec->executeBatch(request, &response); + backend.exec->executeBatch(&txn, request, &response); ASSERT(response.getOk()); ASSERT_EQUALS(response.getN(), 0); ASSERT(response.isErrDetailsSet()); @@ -154,6 +157,7 @@ TEST(BatchWriteExecTests, StaleOp) { // Retry op in exec b/c of stale config // + OperationContextNoop txn; NamespaceString nss("foo.bar"); // Insert request @@ -176,7 +180,7 @@ TEST(BatchWriteExecTests, StaleOp) { // Execute request BatchedCommandResponse response; - backend.exec->executeBatch(request, &response); + backend.exec->executeBatch(&txn, request, &response); ASSERT(response.getOk()); const BatchWriteExecStats& stats = backend.exec->getStats(); @@ -188,6 +192,7 @@ TEST(BatchWriteExecTests, MultiStaleOp) { // Retry op in exec multiple times b/c of stale config // + OperationContextNoop txn; NamespaceString nss("foo.bar"); // Insert request @@ -212,7 +217,7 @@ TEST(BatchWriteExecTests, MultiStaleOp) { // Execute request BatchedCommandResponse response; - backend.exec->executeBatch(request, &response); + backend.exec->executeBatch(&txn, request, &response); ASSERT(response.getOk()); const BatchWriteExecStats& stats = backend.exec->getStats(); @@ -226,6 +231,7 @@ TEST(BatchWriteExecTests, TooManyStaleOp) { // We should report a no progress error for everything in the batch // + OperationContextNoop txn; NamespaceString nss("foo.bar"); // Insert request @@ -252,7 +258,7 @@ TEST(BatchWriteExecTests, TooManyStaleOp) { // Execute request BatchedCommandResponse response; - backend.exec->executeBatch(request, &response); + backend.exec->executeBatch(&txn, request, &response); ASSERT(response.getOk()); ASSERT_EQUALS(response.getN(), 0); ASSERT(response.isErrDetailsSet()); @@ -265,6 +271,7 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) { // Retry op in exec many times b/c of stale config, but simulate remote migrations occurring // + OperationContextNoop txn; NamespaceString nss("foo.bar"); // Insert request @@ -294,7 +301,7 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) { // Execute request BatchedCommandResponse response; - backend.exec->executeBatch(request, &response); + backend.exec->executeBatch(&txn, request, &response); ASSERT(response.getOk()); const BatchWriteExecStats& stats = backend.exec->getStats(); diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 847b96ff9ee..57ea59c0b93 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -226,7 +226,8 @@ static void cancelBatches(const WriteErrorDetail& why, batchMap->clear(); } -Status BatchWriteOp::targetBatch(const NSTargeter& targeter, +Status BatchWriteOp::targetBatch(OperationContext* txn, + const NSTargeter& targeter, bool recordTargetErrors, vector<TargetedWriteBatch*>* targetedBatches) { // @@ -285,7 +286,7 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, OwnedPointerVector<TargetedWrite> writesOwned; vector<TargetedWrite*>& writes = writesOwned.mutableVector(); - Status targetStatus = writeOp.targetWrites(targeter, &writes); + Status targetStatus = writeOp.targetWrites(txn, targeter, &writes); if (!targetStatus.isOK()) { WriteErrorDetail targetError; diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 0add9339268..2d50bd39ed5 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -44,6 +44,7 @@ namespace mongo { +class OperationContext; class TargetedWriteBatch; struct ShardError; struct ShardWCError; @@ -105,7 +106,8 @@ public: * * Returned TargetedWriteBatches are owned by the caller. */ - Status targetBatch(const NSTargeter& targeter, + Status targetBatch(OperationContext* txn, + const NSTargeter& targeter, bool recordTargetErrors, std::vector<TargetedWriteBatch*>* targetedBatches); diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index da5f4e1775f..68dfc4deeb7 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -29,6 +29,7 @@ #include "mongo/s/write_ops/batch_write_op.h" #include "mongo/base/owned_pointer_vector.h" +#include "mongo/db/operation_context_noop.h" #include "mongo/s/mock_ns_targeter.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_delete_document.h" @@ -135,6 +136,7 @@ TEST(WriteOpTests, SingleOp) { // Single-op targeting test // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -151,7 +153,7 @@ TEST(WriteOpTests, SingleOp) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -174,6 +176,7 @@ TEST(WriteOpTests, SingleError) { // Single-op error test // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -190,7 +193,7 @@ TEST(WriteOpTests, SingleError) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -219,6 +222,7 @@ TEST(WriteOpTests, SingleTargetError) { // Single-op targeting error test // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -235,14 +239,14 @@ TEST(WriteOpTests, SingleTargetError) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(!status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 0u); // Record targeting failures - status = batchOp.targetBatch(targeter, true, &targeted); + status = batchOp.targetBatch(&txn, targeter, true, &targeted); ASSERT(status.isOK()); ASSERT(batchOp.isFinished()); @@ -261,6 +265,7 @@ TEST(WriteOpTests, SingleWriteConcernErrorOrdered) { // write concern error if one occurs // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -277,7 +282,7 @@ TEST(WriteOpTests, SingleWriteConcernErrorOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -310,6 +315,7 @@ TEST(WriteOpTests, SingleStaleError) { // We should retry the same batch until we're not stale // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -324,7 +330,7 @@ TEST(WriteOpTests, SingleStaleError) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); BatchedCommandResponse response; buildResponse(0, &response); @@ -335,14 +341,14 @@ TEST(WriteOpTests, SingleStaleError) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); // Respond again with a stale response batchOp.noteBatchResponse(*targeted.front(), response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); buildResponse(1, &response); @@ -376,6 +382,7 @@ TEST(WriteOpTests, MultiOpSameShardOrdered) { // Multi-op targeting test (ordered) // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -394,7 +401,7 @@ TEST(WriteOpTests, MultiOpSameShardOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -419,6 +426,7 @@ TEST(WriteOpTests, MultiOpSameShardUnordered) { // Multi-op targeting test (unordered) // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -437,7 +445,7 @@ TEST(WriteOpTests, MultiOpSameShardUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -463,6 +471,7 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) { // There should be two sets of single batches (one to each shard, one-by-one) // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -482,7 +491,7 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -498,7 +507,7 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); @@ -521,6 +530,7 @@ TEST(WriteOpTests, MultiOpTwoShardsUnordered) { // There should be one set of two batches (one to each shard) // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -540,7 +550,7 @@ TEST(WriteOpTests, MultiOpTwoShardsUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -572,6 +582,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) { // There should be two sets of two batches to each shard (two for each delete op) // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -593,7 +604,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -614,7 +625,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); @@ -642,6 +653,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachUnordered) { // There should be one set of two batches to each shard (containing writes for both ops) // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -663,7 +675,7 @@ TEST(WriteOpTests, MultiOpTwoShardsEachUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -697,6 +709,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) { // last ops should be batched together // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -724,7 +737,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -741,7 +754,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -762,7 +775,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -780,7 +793,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -808,6 +821,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsUnordered) { // Should batch all the ops together into two batches of four ops for each shard // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -835,7 +849,7 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -869,6 +883,7 @@ TEST(WriteOpTests, MultiOpSingleShardErrorUnordered) { // There should be one set of two batches to each shard and an error reported // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -887,7 +902,7 @@ TEST(WriteOpTests, MultiOpSingleShardErrorUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -932,6 +947,7 @@ TEST(WriteOpTests, MultiOpTwoShardErrorsUnordered) { // There should be one set of two batches to each shard and and two errors reported // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -950,7 +966,7 @@ TEST(WriteOpTests, MultiOpTwoShardErrorsUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -998,6 +1014,7 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorUnordered) { // There should be one set of two batches to each shard and an error reported // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -1018,7 +1035,7 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -1064,6 +1081,7 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorOrdered) { // op should not get run // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -1084,7 +1102,7 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); @@ -1134,6 +1152,7 @@ TEST(WriteOpTests, MultiOpErrorAndWriteConcernErrorUnordered) { // Don't suppress the error if ordered : false // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -1151,7 +1170,7 @@ TEST(WriteOpTests, MultiOpErrorAndWriteConcernErrorUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); BatchedCommandResponse response; buildResponse(1, &response); @@ -1178,6 +1197,7 @@ TEST(WriteOpTests, SingleOpErrorAndWriteConcernErrorOrdered) { // Suppress the write concern error if ordered and we also have an error // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -1196,7 +1216,7 @@ TEST(WriteOpTests, SingleOpErrorAndWriteConcernErrorOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); BatchedCommandResponse response; buildResponse(1, &response); @@ -1228,6 +1248,7 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) { // Targeting failure on second op in batch op (ordered) // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -1246,14 +1267,14 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); // First targeting round fails since we may be stale ASSERT(!status.isOK()); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, true, &targeted); + status = batchOp.targetBatch(&txn, targeter, true, &targeted); // Second targeting round is ok, but should stop at first write ASSERT(status.isOK()); @@ -1269,7 +1290,7 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, true, &targeted); + status = batchOp.targetBatch(&txn, targeter, true, &targeted); // Second targeting round results in an error which finishes the batch ASSERT(status.isOK()); @@ -1290,6 +1311,7 @@ TEST(WriteOpTests, MultiOpFailedTargetUnordered) { // Targeting failure on second op in batch op (unordered) // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -1309,14 +1331,14 @@ TEST(WriteOpTests, MultiOpFailedTargetUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); // First targeting round fails since we may be stale ASSERT(!status.isOK()); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, true, &targeted); + status = batchOp.targetBatch(&txn, targeter, true, &targeted); // Second targeting round is ok, and should record an error ASSERT(status.isOK()); @@ -1346,6 +1368,7 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) { // Expect this gets translated down into write errors for first affected write // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -1363,7 +1386,7 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); BatchedCommandResponse response; buildResponse(1, &response); @@ -1373,7 +1396,7 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, true, &targeted); + status = batchOp.targetBatch(&txn, targeter, true, &targeted); buildErrResponse(ErrorCodes::UnknownError, "mock error", &response); @@ -1398,6 +1421,7 @@ TEST(WriteOpTests, MultiOpFailedBatchUnordered) { // Expect this gets translated down into write errors for all affected writes // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -1416,7 +1440,7 @@ TEST(WriteOpTests, MultiOpFailedBatchUnordered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); BatchedCommandResponse response; buildResponse(1, &response); @@ -1450,6 +1474,7 @@ TEST(WriteOpTests, MultiOpAbortOrdered) { // Expect this gets translated down into write error for first affected write // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -1467,7 +1492,7 @@ TEST(WriteOpTests, MultiOpAbortOrdered) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); BatchedCommandResponse response; buildResponse(1, &response); @@ -1499,6 +1524,7 @@ TEST(WriteOpTests, MultiOpAbortUnordered) { // Expect this gets translated down into write errors for all affected writes // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -1539,6 +1565,7 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) { // error // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion::IGNORED()); ShardEndpoint endpointB("shardB", ChunkVersion::IGNORED()); @@ -1556,7 +1583,7 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); BatchedCommandResponse response; buildResponse(1, &response); @@ -1567,7 +1594,7 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, true, &targeted); + status = batchOp.targetBatch(&txn, targeter, true, &targeted); // Second shard write write concern fails. batchOp.noteBatchResponse(*targeted.front(), response, NULL); @@ -1590,6 +1617,7 @@ TEST(WriteOpLimitTests, OneBigDoc) { // Big single operation test - should go through // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -1608,7 +1636,7 @@ TEST(WriteOpLimitTests, OneBigDoc) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); @@ -1624,6 +1652,7 @@ TEST(WriteOpLimitTests, OneBigOneSmall) { // Big doc with smaller additional doc - should go through as two batches // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -1644,7 +1673,7 @@ TEST(WriteOpLimitTests, OneBigOneSmall) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); @@ -1656,7 +1685,7 @@ TEST(WriteOpLimitTests, OneBigOneSmall) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); @@ -1670,6 +1699,7 @@ TEST(WriteOpLimitTests, TooManyOps) { // Batch of 1002 documents // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -1688,7 +1718,7 @@ TEST(WriteOpLimitTests, TooManyOps) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); ASSERT_EQUALS(targeted.front()->getWrites().size(), 1000u); @@ -1700,7 +1730,7 @@ TEST(WriteOpLimitTests, TooManyOps) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); @@ -1715,6 +1745,7 @@ TEST(WriteOpLimitTests, UpdateOverheadIncluded) { // calculation // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); MockNSTargeter targeter; @@ -1750,7 +1781,7 @@ TEST(WriteOpLimitTests, UpdateOverheadIncluded) { OwnedPointerVector<TargetedWriteBatch> targetedOwned; vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); - Status status = batchOp.targetBatch(targeter, false, &targeted); + Status status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u); @@ -1766,7 +1797,7 @@ TEST(WriteOpLimitTests, UpdateOverheadIncluded) { ASSERT(!batchOp.isFinished()); targetedOwned.clear(); - status = batchOp.targetBatch(targeter, false, &targeted); + status = batchOp.targetBatch(&txn, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u); diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index f630802121d..24dd177cbda 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -63,7 +63,8 @@ const WriteErrorDetail& WriteOp::getOpError() const { return *_error; } -Status WriteOp::targetWrites(const NSTargeter& targeter, +Status WriteOp::targetWrites(OperationContext* txn, + const NSTargeter& targeter, std::vector<TargetedWrite*>* targetedWrites) { bool isUpdate = _itemRef.getOpType() == BatchedCommandRequest::BatchType_Update; bool isDelete = _itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete; @@ -74,16 +75,16 @@ Status WriteOp::targetWrites(const NSTargeter& targeter, vector<ShardEndpoint*>& endpoints = endpointsOwned.mutableVector(); if (isUpdate) { - targetStatus = targeter.targetUpdate(*_itemRef.getUpdate(), &endpoints); + targetStatus = targeter.targetUpdate(txn, *_itemRef.getUpdate(), &endpoints); } else if (isDelete) { - targetStatus = targeter.targetDelete(*_itemRef.getDelete(), &endpoints); + targetStatus = targeter.targetDelete(txn, *_itemRef.getDelete(), &endpoints); } else { dassert(_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert); ShardEndpoint* endpoint = NULL; // TODO: Remove the index targeting stuff once there is a command for it if (!isIndexInsert) { - targetStatus = targeter.targetInsert(_itemRef.getDocument(), &endpoint); + targetStatus = targeter.targetInsert(txn, _itemRef.getDocument(), &endpoint); } else { // TODO: Retry index writes with stale version? targetStatus = targeter.targetCollection(&endpoints); diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h index fb189edfffb..bd50896b04a 100644 --- a/src/mongo/s/write_ops/write_op.h +++ b/src/mongo/s/write_ops/write_op.h @@ -122,7 +122,9 @@ public: * Returns !OK if the targeting process itself fails * (no TargetedWrites will be added, state unchanged) */ - Status targetWrites(const NSTargeter& targeter, std::vector<TargetedWrite*>* targetedWrites); + Status targetWrites(OperationContext* txn, + const NSTargeter& targeter, + std::vector<TargetedWrite*>* targetedWrites); /** * Returns the number of child writes that were last targeted. diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp index dd8b116c7ed..a180fc8abdc 100644 --- a/src/mongo/s/write_ops/write_op_test.cpp +++ b/src/mongo/s/write_ops/write_op_test.cpp @@ -31,6 +31,7 @@ #include "mongo/base/error_codes.h" #include "mongo/base/owned_pointer_vector.h" +#include "mongo/db/operation_context_noop.h" #include "mongo/s/mock_ns_targeter.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_delete_document.h" @@ -82,6 +83,7 @@ TEST(WriteOpTests, TargetSingle) { // Basic targeting test // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); @@ -103,7 +105,7 @@ TEST(WriteOpTests, TargetSingle) { OwnedPointerVector<TargetedWrite> targetedOwned; vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(targeter, &targeted); + Status status = writeOp.targetWrites(&txn, targeter, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); @@ -139,6 +141,7 @@ TEST(WriteOpTests, TargetMultiOneShard) { // Multi-write targeting test where our query goes to one shard // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion(10, 0, OID())); @@ -164,7 +167,7 @@ TEST(WriteOpTests, TargetMultiOneShard) { OwnedPointerVector<TargetedWrite> targetedOwned; vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(targeter, &targeted); + Status status = writeOp.targetWrites(&txn, targeter, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); @@ -181,6 +184,7 @@ TEST(WriteOpTests, TargetMultiAllShards) { // Multi-write targeting test where our write goes to more than one shard // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpointA("shardA", ChunkVersion(10, 0, OID())); @@ -207,7 +211,7 @@ TEST(WriteOpTests, TargetMultiAllShards) { OwnedPointerVector<TargetedWrite> targetedOwned; vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(targeter, &targeted); + Status status = writeOp.targetWrites(&txn, targeter, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); @@ -232,6 +236,7 @@ TEST(WriteOpTests, ErrorSingle) { // Single error after targeting test // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); @@ -253,7 +258,7 @@ TEST(WriteOpTests, ErrorSingle) { OwnedPointerVector<TargetedWrite> targetedOwned; vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(targeter, &targeted); + Status status = writeOp.targetWrites(&txn, targeter, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); @@ -277,6 +282,7 @@ TEST(WriteOpTests, CancelSingle) { // Cancel single targeting test // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); @@ -298,7 +304,7 @@ TEST(WriteOpTests, CancelSingle) { OwnedPointerVector<TargetedWrite> targetedOwned; vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(targeter, &targeted); + Status status = writeOp.targetWrites(&txn, targeter, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); @@ -319,6 +325,7 @@ TEST(WriteOpTests, RetrySingleOp) { // Retry single targeting test // + OperationContextNoop txn; NamespaceString nss("foo.bar"); ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); @@ -340,7 +347,7 @@ TEST(WriteOpTests, RetrySingleOp) { OwnedPointerVector<TargetedWrite> targetedOwned; vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(targeter, &targeted); + Status status = writeOp.targetWrites(&txn, targeter, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); |