diff options
author | Henrik Edin <henrik.edin@mongodb.com> | 2017-12-15 11:33:30 -0500 |
---|---|---|
committer | Henrik Edin <henrik.edin@mongodb.com> | 2018-01-22 13:51:26 -0500 |
commit | c376f4b80d26028b6a8746f8545a35e390b59bf2 (patch) | |
tree | bffed1afdcc737dcbc899af35c84ccd6cbc4d100 /src | |
parent | 50921266423bf59267b08e0f8ee23469ea03d768 (diff) | |
download | mongo-c376f4b80d26028b6a8746f8545a35e390b59bf2.tar.gz |
SERVER-29519 Removed many usages of getGlobalReplicationCoordinator
Diffstat (limited to 'src')
55 files changed, 142 insertions, 134 deletions
diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp index d38983423cf..90af4fbca6d 100644 --- a/src/mongo/db/catalog/capped_utils.cpp +++ b/src/mongo/db/catalog/capped_utils.cpp @@ -55,7 +55,7 @@ mongo::Status mongo::emptyCapped(OperationContext* opCtx, const NamespaceString& AutoGetDb autoDb(opCtx, collectionName.db(), MODE_X); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, collectionName); + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, collectionName); if (userInitiatedWritesAndNotPrimary) { return Status(ErrorCodes::NotMaster, @@ -84,7 +84,7 @@ mongo::Status mongo::emptyCapped(OperationContext* opCtx, const NamespaceString& << collectionName.ns()); } - if ((repl::getGlobalReplicationCoordinator()->getReplicationMode() != + if ((repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() != repl::ReplicationCoordinator::modeNone) && collectionName.isOplog()) { return Status(ErrorCodes::OplogOperationUnsupported, @@ -258,7 +258,7 @@ mongo::Status mongo::convertToCapped(OperationContext* opCtx, AutoGetDb autoDb(opCtx, collectionName.db(), MODE_X); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, collectionName); + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, collectionName); if (userInitiatedWritesAndNotPrimary) { return Status(ErrorCodes::NotMaster, diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp index 1788b5342c8..9d31d1bfa8b 100644 --- a/src/mongo/db/catalog/coll_mod.cpp +++ b/src/mongo/db/catalog/coll_mod.cpp @@ -311,7 +311,7 @@ Status _collModInternal(OperationContext* opCtx, OldClientContext ctx(opCtx, nss.ns()); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss); + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss); if (userInitiatedWritesAndNotPrimary) { return Status(ErrorCodes::NotMaster, @@ -650,7 +650,7 @@ void updateUUIDSchemaVersion(OperationContext* opCtx, bool upgrade) { const WriteConcernOptions writeConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, /*timeout*/ INT_MAX); - repl::getGlobalReplicationCoordinator()->awaitReplication(opCtx, awaitOpTime, writeConcern); + repl::ReplicationCoordinator::get(opCtx)->awaitReplication(opCtx, awaitOpTime, writeConcern); } Status updateUUIDSchemaVersionNonReplicated(OperationContext* opCtx, bool upgrade) { diff --git a/src/mongo/db/catalog/database_test.cpp b/src/mongo/db/catalog/database_test.cpp index 35c1127bb64..2671183e747 100644 --- a/src/mongo/db/catalog/database_test.cpp +++ b/src/mongo/db/catalog/database_test.cpp @@ -88,7 +88,7 @@ void DatabaseTest::setUp() { // Set up ReplicationCoordinator and create oplog. repl::ReplicationCoordinator::set(service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service)); - repl::setOplogCollectionName(); + repl::setOplogCollectionName(service); repl::createOplog(_opCtx.get()); // Ensure that we are primary. diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp index cd9e6ad4106..33b9acebea0 100644 --- a/src/mongo/db/catalog/drop_collection.cpp +++ b/src/mongo/db/catalog/drop_collection.cpp @@ -73,7 +73,7 @@ Status dropCollection(OperationContext* opCtx, OldClientContext context(opCtx, collectionName.ns(), shardVersionCheck); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, collectionName); + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, collectionName); if (userInitiatedWritesAndNotPrimary) { return Status(ErrorCodes::NotMaster, diff --git a/src/mongo/db/catalog/drop_database_test.cpp b/src/mongo/db/catalog/drop_database_test.cpp index f26a624d687..9b5aa45c9d3 100644 --- a/src/mongo/db/catalog/drop_database_test.cpp +++ b/src/mongo/db/catalog/drop_database_test.cpp @@ -136,7 +136,7 @@ void DropDatabaseTest::setUp() { auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorMock>(service); _replCoord = replCoord.get(); repl::ReplicationCoordinator::set(service, std::move(replCoord)); - repl::setOplogCollectionName(); + repl::setOplogCollectionName(service); repl::createOplog(_opCtx.get()); // Ensure that we are primary. diff --git a/src/mongo/db/catalog/drop_indexes.cpp b/src/mongo/db/catalog/drop_indexes.cpp index 29b9ec80cd5..67a6a2f0903 100644 --- a/src/mongo/db/catalog/drop_indexes.cpp +++ b/src/mongo/db/catalog/drop_indexes.cpp @@ -153,7 +153,7 @@ Status dropIndexes(OperationContext* opCtx, AutoGetDb autoDb(opCtx, nss.db(), MODE_X); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss); + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss); if (userInitiatedWritesAndNotPrimary) { return Status(ErrorCodes::NotMaster, diff --git a/src/mongo/db/catalog/index_catalog_impl.cpp b/src/mongo/db/catalog/index_catalog_impl.cpp index 62622e710ec..e5746e61c7a 100644 --- a/src/mongo/db/catalog/index_catalog_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_impl.cpp @@ -711,7 +711,7 @@ Status IndexCatalogImpl::_isSpecOk(OperationContext* opCtx, const BSONObj& spec) } else { // for non _id indexes, we check to see if replication has turned off all indexes // we _always_ created _id index - if (!repl::getGlobalReplicationCoordinator()->buildsIndexes()) { + if (!repl::ReplicationCoordinator::get(opCtx)->buildsIndexes()) { // this is not exactly the right error code, but I think will make the most sense return Status(ErrorCodes::IndexAlreadyExists, "no indexes per repl"); } diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index d9f80ccfd5d..93701b0b30e 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -102,7 +102,7 @@ Status renameCollectionCommon(OperationContext* opCtx, ctx.emplace(opCtx, source.ns()); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, source); + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, source); if (userInitiatedWritesAndNotPrimary) { return Status(ErrorCodes::NotMaster, diff --git a/src/mongo/db/catalog/rename_collection_test.cpp b/src/mongo/db/catalog/rename_collection_test.cpp index 36b1ed782f0..bc99b849732 100644 --- a/src/mongo/db/catalog/rename_collection_test.cpp +++ b/src/mongo/db/catalog/rename_collection_test.cpp @@ -226,7 +226,7 @@ void RenameCollectionTest::setUp() { auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorMock>(service); _replCoord = replCoord.get(); repl::ReplicationCoordinator::set(service, std::move(replCoord)); - repl::setOplogCollectionName(); + repl::setOplogCollectionName(service); repl::createOplog(_opCtx.get()); // Ensure that we are primary. diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index a2ba154138c..46817485eba 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -140,7 +140,7 @@ void ClientCursor::updateSlaveLocation(OperationContext* opCtx) { if (!rid.isSet()) return; - repl::getGlobalReplicationCoordinator() + repl::ReplicationCoordinator::get(opCtx) ->setLastOptimeForSlave(rid, _slaveReadTill) .transitional_ignore(); } diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index 7ab9d368df1..5dece280806 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -148,7 +148,7 @@ struct Cloner::Fun { << " to " << to_collection.ns(), !opCtx->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, to_collection)); + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, to_collection)); // Make sure database still exists after we resume from the temp release Database* db = dbHolder().openDb(opCtx, _dbName); @@ -204,7 +204,7 @@ struct Cloner::Fun { uassert(ErrorCodes::PrimarySteppedDown, str::stream() << "Cannot write to ns: " << to_collection.ns() << " after yielding", - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor( + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor( opCtx, to_collection)); } @@ -336,7 +336,7 @@ void Cloner::copy(OperationContext* opCtx, << " with filter " << query.toString(), !opCtx->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, to_collection)); + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, to_collection)); } void Cloner::copyIndexes(OperationContext* opCtx, @@ -359,7 +359,7 @@ void Cloner::copyIndexes(OperationContext* opCtx, << to_collection.ns() << " (Cloner)", !opCtx->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, to_collection)); + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, to_collection)); if (indexesToBuild.empty()) @@ -472,7 +472,7 @@ bool Cloner::copyCollection(OperationContext* opCtx, uassert(ErrorCodes::PrimarySteppedDown, str::stream() << "Not primary while copying collection " << ns << " (Cloner)", !opCtx->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss)); + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); Database* db = dbHolder().openDb(opCtx, dbname); @@ -714,7 +714,7 @@ Status Cloner::copyDb(OperationContext* opCtx, str::stream() << "Not primary while cloning database " << opts.fromDB << " (after getting list of collections to clone)", !opCtx->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(opCtx, toDBName)); + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, toDBName)); if (opts.syncData) { if (opts.createCollections) { diff --git a/src/mongo/db/commands/collection_to_capped.cpp b/src/mongo/db/commands/collection_to_capped.cpp index 5f283fa2195..8348dcd938a 100644 --- a/src/mongo/db/commands/collection_to_capped.cpp +++ b/src/mongo/db/commands/collection_to_capped.cpp @@ -120,7 +120,7 @@ public: AutoGetDb autoDb(opCtx, dbname, MODE_X); NamespaceString nss(dbname, to); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss)) { + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::NotMaster, diff --git a/src/mongo/db/commands/compact.cpp b/src/mongo/db/commands/compact.cpp index a4371eb129d..cb8ab25bd7a 100644 --- a/src/mongo/db/commands/compact.cpp +++ b/src/mongo/db/commands/compact.cpp @@ -93,7 +93,7 @@ public: BSONObjBuilder& result) { NamespaceString nss = CommandHelpers::parseNsCollectionRequired(db, cmdObj); - repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); + repl::ReplicationCoordinator* replCoord = repl::ReplicationCoordinator::get(opCtx); if (replCoord->getMemberState().primary() && !cmdObj["force"].trueValue()) { errmsg = "will not run compact on an active replica set primary as this is a slow blocking " diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index 79c38e4424b..63f94e9f5cf 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -258,7 +258,7 @@ public: // now we know we have to create index(es) // Note: createIndexes command does not currently respect shard versioning. Lock::DBLock dbLock(opCtx, ns.db(), MODE_X); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, ns)) { + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, ns)) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::NotMaster, @@ -337,7 +337,7 @@ public: if (indexer.getBuildInBackground()) { opCtx->recoveryUnit()->abandonSnapshot(); dbLock.relockWithMode(MODE_IX); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, ns)) { + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, ns)) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::NotMaster, @@ -359,7 +359,7 @@ public: // that day, to avoid data corruption due to lack of index cleanup. opCtx->recoveryUnit()->abandonSnapshot(); dbLock.relockWithMode(MODE_X); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, ns)) { + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, ns)) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::NotMaster, @@ -381,7 +381,7 @@ public: dbLock.relockWithMode(MODE_X); uassert(ErrorCodes::NotMaster, str::stream() << "Not primary while completing index build in " << dbname, - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, ns)); + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, ns)); Database* db = dbHolder().get(opCtx, ns.db()); uassert(28551, "database dropped during index build", db); diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index c71161bcb2b..8e63f8f2137 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -129,7 +129,7 @@ public: timeoutSecs = cmdObj["timeoutSecs"].numberLong(); } - Status status = repl::getGlobalReplicationCoordinator()->stepDown( + Status status = repl::ReplicationCoordinator::get(opCtx)->stepDown( opCtx, force, Seconds(timeoutSecs), Seconds(120)); if (!status.isOK() && status.code() != ErrorCodes::NotMaster) { // ignore not master return CommandHelpers::appendCommandStatus(result, status); @@ -180,7 +180,7 @@ public: "with --configsvr")); } - if ((repl::getGlobalReplicationCoordinator()->getReplicationMode() != + if ((repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() != repl::ReplicationCoordinator::modeNone) && ((dbname == NamespaceString::kLocalDb) || (dbname == NamespaceString::kAdminDb))) { return CommandHelpers::appendCommandStatus( @@ -430,7 +430,7 @@ public: return false; } - if ((repl::getGlobalReplicationCoordinator()->getReplicationMode() != + if ((repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() != repl::ReplicationCoordinator::modeNone) && nsToDrop.isOplog()) { errmsg = "can't drop live oplog while replicating"; diff --git a/src/mongo/db/commands/explain_cmd.cpp b/src/mongo/db/commands/explain_cmd.cpp index a2adaef7d38..c89ca471c2d 100644 --- a/src/mongo/db/commands/explain_cmd.cpp +++ b/src/mongo/db/commands/explain_cmd.cpp @@ -152,7 +152,7 @@ public: // Check whether the child command is allowed to run here. TODO: this logic is // copied from Command::execCommand and should be abstracted. Until then, make // sure to keep it up to date. - repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); + repl::ReplicationCoordinator* replCoord = repl::ReplicationCoordinator::get(opCtx); bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); bool commandCanRunOnSecondary = commToExplain->slaveOk(); diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 9e015a9f964..c76cbbd5516 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -186,7 +186,7 @@ void appendCommandResponse(const PlanExecutor* exec, } Status checkCanAcceptWritesForDatabase(OperationContext* opCtx, const NamespaceString& nsString) { - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nsString)) { + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nsString)) { return Status(ErrorCodes::NotMaster, str::stream() << "Not primary while running findAndModify command on collection " diff --git a/src/mongo/db/commands/generic.cpp b/src/mongo/db/commands/generic.cpp index 5975a311fcc..c80e18e03bb 100644 --- a/src/mongo/db/commands/generic.cpp +++ b/src/mongo/db/commands/generic.cpp @@ -240,7 +240,7 @@ public: BSONObjBuilder& result) { bool didRotate = rotateLogs(serverGlobalParams.logRenameOnRotate); if (didRotate) - logProcessDetailsForLogRotate(); + logProcessDetailsForLogRotate(opCtx->getServiceContext()); return didRotate; } diff --git a/src/mongo/db/commands/get_last_error.cpp b/src/mongo/db/commands/get_last_error.cpp index a0b66d375ef..567ea05a6e6 100644 --- a/src/mongo/db/commands/get_last_error.cpp +++ b/src/mongo/db/commands/get_last_error.cpp @@ -146,7 +146,7 @@ public: // Always append lastOp and connectionId Client& c = *opCtx->getClient(); - auto replCoord = repl::getGlobalReplicationCoordinator(); + auto replCoord = repl::ReplicationCoordinator::get(opCtx); if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { const repl::OpTime lastOp = repl::ReplClientInfo::forClient(c).getLastOp(); if (!lastOp.isNull()) { @@ -227,7 +227,7 @@ public: WriteConcernOptions writeConcern; if (useDefaultGLEOptions) { - writeConcern = repl::getGlobalReplicationCoordinator()->getGetLastErrorDefault(); + writeConcern = repl::ReplicationCoordinator::get(opCtx)->getGetLastErrorDefault(); } Status status = writeConcern.parse(writeConcernDoc); @@ -259,7 +259,7 @@ public: // If we got an electionId, make sure it matches if (electionIdPresent) { - if (repl::getGlobalReplicationCoordinator()->getReplicationMode() != + if (repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { // Ignore electionIds of 0 from mongos. if (electionId != OID()) { @@ -269,9 +269,9 @@ public: return false; } } else { - if (electionId != repl::getGlobalReplicationCoordinator()->getElectionId()) { + if (electionId != repl::ReplicationCoordinator::get(opCtx)->getElectionId()) { LOG(3) << "oid passed in is " << electionId << ", but our id is " - << repl::getGlobalReplicationCoordinator()->getElectionId(); + << repl::ReplicationCoordinator::get(opCtx)->getElectionId(); errmsg = "election occurred after write"; result.append("code", ErrorCodes::WriteConcernFailed); result.append("codeName", diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 35b0aaca426..7431474ee6a 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -371,7 +371,7 @@ void State::dropTempCollections() { WriteUnitOfWork wunit(_opCtx); uassert(ErrorCodes::PrimarySteppedDown, "no longer primary", - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor( + repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor( _opCtx, _config.tempNamespace)); db->dropCollection(_opCtx, _config.tempNamespace.ns()).transitional_ignore(); wunit.commit(); @@ -498,8 +498,8 @@ void State::prepTempCollection() { WriteUnitOfWork wuow(_opCtx); uassert(ErrorCodes::PrimarySteppedDown, "no longer primary", - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx, - _config.tempNamespace)); + repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor( + _opCtx, _config.tempNamespace)); Collection* tempColl = tempCtx.getCollection(); invariant(!tempColl); @@ -756,7 +756,7 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) { WriteUnitOfWork wuow(_opCtx); uassert(ErrorCodes::PrimarySteppedDown, "no longer primary", - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_opCtx, nss)); + repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, nss)); Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), nss); BSONObjBuilder b; @@ -1469,8 +1469,8 @@ public: if (state.isOnDisk()) { // this means that it will be doing a write operation, make sure it is safe to // do so. - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, - config.nss)) { + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, + config.nss)) { uasserted(ErrorCodes::NotMaster, "not master"); return false; } diff --git a/src/mongo/db/commands/rename_collection_cmd.cpp b/src/mongo/db/commands/rename_collection_cmd.cpp index c4217c02e6e..aac288179a2 100644 --- a/src/mongo/db/commands/rename_collection_cmd.cpp +++ b/src/mongo/db/commands/rename_collection_cmd.cpp @@ -110,7 +110,7 @@ public: str::stream() << "Invalid target namespace: " << target.ns(), target.isValid()); - if ((repl::getGlobalReplicationCoordinator()->getReplicationMode() != + if ((repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() != repl::ReplicationCoordinator::modeNone)) { if (source.isOplog()) { errmsg = "can't rename live oplog while replicating"; diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index f7d37050f54..a3bc4d3f990 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -372,7 +372,7 @@ void checkForIdIndexesAndDropPendingCollections(OperationContext* opCtx, Databas unsigned long long checkIfReplMissingFromCommandLine(OperationContext* opCtx) { // This is helpful for the query below to work as you can't open files when readlocked Lock::GlobalWrite lk(opCtx); - if (!repl::getGlobalReplicationCoordinator()->getSettings().usingReplSets()) { + if (!repl::ReplicationCoordinator::get(opCtx)->getSettings().usingReplSets()) { DBDirectClient c(opCtx); return c.count(kSystemReplSetCollection.ns()); } @@ -434,7 +434,8 @@ StatusWith<bool> repairDatabasesAndCheckVersion(OperationContext* opCtx) { } } - const repl::ReplSettings& replSettings = repl::getGlobalReplicationCoordinator()->getSettings(); + const repl::ReplSettings& replSettings = + repl::ReplicationCoordinator::get(opCtx)->getSettings(); if (!storageGlobalParams.readOnly) { StatusWith<std::vector<StorageEngine::CollectionIndexNamePair>> swIndexesToRebuild = @@ -918,7 +919,7 @@ ExitCode _initAndListen(int listenPort) { if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { // Note: For replica sets, ShardingStateRecovery happens on transition to primary. - if (!repl::getGlobalReplicationCoordinator()->isReplEnabled()) { + if (!repl::ReplicationCoordinator::get(startupOpCtx.get())->isReplEnabled()) { uassertStatusOK(ShardingStateRecovery::recover(startupOpCtx.get())); } } else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { @@ -1202,7 +1203,7 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, storageInterface, static_cast<int64_t>(curTimeMillis64())); repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); - repl::setOplogCollectionName(); + repl::setOplogCollectionName(serviceContext); return Status::OK(); } diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index 02931d128b5..57820ea7a27 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -276,7 +276,7 @@ void DeleteStage::doRestoreState() { uassert(ErrorCodes::PrimarySteppedDown, str::stream() << "Demoted from primary while removing from " << ns.ns(), !getOpCtx()->writesAreReplicated() || - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(getOpCtx(), ns)); + repl::ReplicationCoordinator::get(getOpCtx())->canAcceptWritesFor(getOpCtx(), ns)); } unique_ptr<PlanStageStats> DeleteStage::getStats() { diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index 53ba1fc9617..0a214d97bd9 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -716,7 +716,7 @@ void UpdateStage::doRestoreState() { // We may have stepped down during the yield. bool userInitiatedWritesAndNotPrimary = getOpCtx()->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(getOpCtx(), nsString); + !repl::ReplicationCoordinator::get(getOpCtx())->canAcceptWritesFor(getOpCtx(), nsString); if (userInitiatedWritesAndNotPrimary) { uasserted(ErrorCodes::PrimarySteppedDown, diff --git a/src/mongo/db/log_process_details.cpp b/src/mongo/db/log_process_details.cpp index 4381524509e..681db42d7e1 100644 --- a/src/mongo/db/log_process_details.cpp +++ b/src/mongo/db/log_process_details.cpp @@ -59,12 +59,12 @@ void logProcessDetails() { printCommandLineOpts(); } -void logProcessDetailsForLogRotate() { +void logProcessDetailsForLogRotate(ServiceContext* serviceContext) { log() << "pid=" << ProcessId::getCurrent() << " port=" << serverGlobalParams.port << (is32bit() ? " 32" : " 64") << "-bit " << "host=" << getHostNameCached(); - auto replCoord = repl::getGlobalReplicationCoordinator(); + auto replCoord = repl::ReplicationCoordinator::get(serviceContext); if (replCoord != nullptr && replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { auto rsConfig = replCoord->getConfig(); diff --git a/src/mongo/db/log_process_details.h b/src/mongo/db/log_process_details.h index 04ef7ddfe5d..c90921d0ee3 100644 --- a/src/mongo/db/log_process_details.h +++ b/src/mongo/db/log_process_details.h @@ -30,6 +30,8 @@ namespace mongo { +class ServiceContext; + /** * Writes useful information about the running process to the diagnostic log on startup. */ @@ -39,6 +41,6 @@ void logProcessDetails(); * Writes useful information about the running process to diagnostic log * for after a log rotation. */ -void logProcessDetailsForLogRotate(); +void logProcessDetailsForLogRotate(ServiceContext* serviceContext); } // namespace mongo diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 831a4e750a1..47d9a558a63 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -55,7 +55,7 @@ private: // Set up ReplicationCoordinator and create oplog. repl::ReplicationCoordinator::set( service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service)); - repl::setOplogCollectionName(); + repl::setOplogCollectionName(service); repl::createOplog(opCtx.get()); // Ensure that we are primary. diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index 2b1b7225513..8bc1344c0e0 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -79,7 +79,7 @@ UpdateResult update(OperationContext* opCtx, Database* db, const UpdateRequest& Lock::DBLock lk(opCtx, nsString.db(), MODE_X); const bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nsString); + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nsString); if (userInitiatedWritesAndNotPrimary) { uassertStatusOK(Status(ErrorCodes::PrimarySteppedDown, diff --git a/src/mongo/db/prefetch.cpp b/src/mongo/db/prefetch.cpp index 5a43e2ab982..615721b5e31 100644 --- a/src/mongo/db/prefetch.cpp +++ b/src/mongo/db/prefetch.cpp @@ -155,7 +155,7 @@ void prefetchPagesForReplicatedOp(OperationContext* opCtx, const OplogEntry& oplogEntry) { invariant(db); const ReplSettings::IndexPrefetchConfig prefetchConfig = - getGlobalReplicationCoordinator()->getIndexPrefetchConfig(); + ReplicationCoordinator::get(opCtx)->getIndexPrefetchConfig(); // Prefetch ignores non-CRUD operations. if (!oplogEntry.isCrudOpType()) { diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index e6b921ed459..84ab8649f2f 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -734,7 +734,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDelete( } bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss); + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss); if (userInitiatedWritesAndNotPrimary) { return Status(ErrorCodes::PrimarySteppedDown, @@ -884,7 +884,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorUpdate( // writes on a secondary. If this is an update to a secondary from the replication system, // however, then we make an exception and let the write proceed. bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss); + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss); if (userInitiatedWritesAndNotPrimary) { return Status(ErrorCodes::PrimarySteppedDown, diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp index cc56d823fa6..5a799beed44 100644 --- a/src/mongo/db/repl/apply_ops_test.cpp +++ b/src/mongo/db/repl/apply_ops_test.cpp @@ -91,7 +91,7 @@ void ApplyOpsTest::setUp() { // Set up ReplicationCoordinator and create oplog. ReplicationCoordinator::set(service, stdx::make_unique<ReplicationCoordinatorMock>(service)); - setOplogCollectionName(); + setOplogCollectionName(service); createOplog(opCtx.get()); // Ensure that we are primary. diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 7a411a498b3..452b3c24941 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -139,11 +139,12 @@ static ServerStatusMetricField<Counter64> displayBufferMaxSize("repl.buffer.maxS BackgroundSync::BackgroundSync( + ReplicationCoordinator* replicationCoordinator, ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, ReplicationProcess* replicationProcess, std::unique_ptr<OplogBuffer> oplogBuffer) : _oplogBuffer(std::move(oplogBuffer)), - _replCoord(getGlobalReplicationCoordinator()), + _replCoord(replicationCoordinator), _replicationCoordinatorExternalState(replicationCoordinatorExternalState), _replicationProcess(replicationProcess) { // Update "repl.buffer.maxSizeBytes" server status metric to reflect the current oplog buffer's diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 427d195149c..cf1f064c4f5 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -77,7 +77,8 @@ public: */ enum class ProducerState { Starting, Running, Stopped }; - BackgroundSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, + BackgroundSync(ReplicationCoordinator* replicationCoordinator, + ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, ReplicationProcess* replicationProcess, std::unique_ptr<OplogBuffer> oplogBuffer); diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp index de0de6ab4b9..419b28a82e2 100644 --- a/src/mongo/db/repl/do_txn_test.cpp +++ b/src/mongo/db/repl/do_txn_test.cpp @@ -90,7 +90,7 @@ void DoTxnTest::setUp() { // Set up ReplicationCoordinator and create oplog. ReplicationCoordinator::set(service, stdx::make_unique<ReplicationCoordinatorMock>(service)); - setOplogCollectionName(); + setOplogCollectionName(service); createOplog(opCtx.get()); // Ensure that we are primary. diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 08961a2c840..63210ca1bed 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -270,7 +270,7 @@ void ReplSource::loadAll(OperationContext* opCtx, SourceVector& v) { SourceVector old = v; v.clear(); - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings(); if (!replSettings.getSource().empty()) { // --source <host> specified. // check that no items are in sources other than that @@ -394,7 +394,7 @@ public: ReplClientInfo::forClient(opCtx->getClient()).setRemoteID(handshake.getRid()); - status = getGlobalReplicationCoordinator()->processHandshake(opCtx, handshake); + status = ReplicationCoordinator::get(opCtx)->processHandshake(opCtx, handshake); return CommandHelpers::appendCommandStatus(result, status); } @@ -439,7 +439,7 @@ void ReplSource::forceResync(OperationContext* opCtx, const char* requester) { if (!_connect(&oplogReader, HostAndPort(hostName), - getGlobalReplicationCoordinator()->getMyRID())) { + ReplicationCoordinator::get(opCtx)->getMyRID())) { msgasserted(14051, "unable to connect to resync"); } bool ok = oplogReader.conn()->runCommand( @@ -466,12 +466,12 @@ void ReplSource::forceResync(OperationContext* opCtx, const char* requester) { save(opCtx); } -Status ReplSource::_updateIfDoneWithInitialSync() { +Status ReplSource::_updateIfDoneWithInitialSync(OperationContext* opCtx) { const auto usedToDoHandshake = _doHandshake; if (!usedToDoHandshake && addDbNextPass.empty() && incompleteCloneDbs.empty()) { _doHandshake = true; oplogReader.resetConnection(); - const auto myRID = getGlobalReplicationCoordinator()->getMyRID(); + const auto myRID = ReplicationCoordinator::get(opCtx)->getMyRID(); if (!_connect(&oplogReader, HostAndPort{hostName}, myRID)) { return {ErrorCodes::MasterSlaveConnectionFailure, str::stream() << "could not connect to " << hostName << " with rid: " @@ -727,7 +727,7 @@ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* opCtx, // reported. CurOp individualOp(opCtx); UnreplicatedWritesBlock uwb(opCtx); - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings(); if (replSettings.getPretouch() && !alreadyLocked /*doesn't make sense if in write lock already*/) { if (replSettings.getPretouch() > 1) { @@ -949,7 +949,7 @@ int ReplSource::_sync_pullOpLog(OperationContext* opCtx, int& nApplied) { } } - auto status = _updateIfDoneWithInitialSync(); + auto status = _updateIfDoneWithInitialSync(opCtx); if (!status.isOK()) { switch (status.code()) { case ErrorCodes::Interrupted: { @@ -1089,7 +1089,8 @@ int ReplSource::_sync_pullOpLog(OperationContext* opCtx, int& nApplied) { "replication error last applied optime at slave >= nextOpTime from master", false); } - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + const ReplSettings& replSettings = + ReplicationCoordinator::get(opCtx)->getSettings(); if (replSettings.getSlaveDelaySecs() != Seconds(0) && (Seconds(time(0)) < Seconds(nextOpTime.getSecs()) + replSettings.getSlaveDelaySecs())) { @@ -1154,7 +1155,7 @@ int ReplSource::sync(OperationContext* opCtx, int& nApplied) { } if (!_connect( - &oplogReader, HostAndPort(hostName), getGlobalReplicationCoordinator()->getMyRID())) { + &oplogReader, HostAndPort(hostName), ReplicationCoordinator::get(opCtx)->getMyRID())) { LOG(4) << "can't connect to sync source" << endl; return -1; } @@ -1236,7 +1237,7 @@ static void replMain(OperationContext* opCtx) { Lock::GlobalWrite lk(opCtx); if (replAllDead) { // throttledForceResyncDead can throw - if (!getGlobalReplicationCoordinator()->getSettings().isAutoResyncEnabled() || + if (!ReplicationCoordinator::get(opCtx)->getSettings().isAutoResyncEnabled() || !ReplSource::throttledForceResyncDead(opCtx, "auto")) { log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds" << endl; @@ -1348,7 +1349,7 @@ static void replSlaveThread() { } void startMasterSlave(OperationContext* opCtx) { - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + const ReplSettings& replSettings = ReplicationCoordinator::get(opCtx)->getSettings(); if (!replSettings.isSlave() && !replSettings.isMaster()) return; diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h index 84c41df95ba..81d56aadac6 100644 --- a/src/mongo/db/repl/master_slave.h +++ b/src/mongo/db/repl/master_slave.h @@ -136,7 +136,7 @@ class ReplSource { bool _connect(OplogReader* reader, const HostAndPort& host, const OID& myRID); - Status _updateIfDoneWithInitialSync(); + Status _updateIfDoneWithInitialSync(OperationContext* opCtx); public: OplogReader oplogReader; diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp index d690484b0ce..95dd56ea92b 100644 --- a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp +++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp @@ -81,7 +81,7 @@ void MockReplCoordServerFixture::setUp() { ASSERT_TRUE( client.createCollection(NamespaceString::kRsOplogNamespace.ns(), 1024 * 1024, true)); - repl::setOplogCollectionName(); + repl::setOplogCollectionName(service); repl::acquireOplogCollectionForLogging(opCtx()); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 0c7c5e29efd..0deb0def76f 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -218,8 +218,8 @@ private: } // namespace -void setOplogCollectionName() { - switch (getGlobalReplicationCoordinator()->getReplicationMode()) { +void setOplogCollectionName(ServiceContext* service) { + switch (ReplicationCoordinator::get(service)->getReplicationMode()) { case ReplicationCoordinator::modeReplSet: _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns(); break; diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 6f773694945..6a2de1c6bc4 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -245,7 +245,7 @@ void setNewTimestamp(ServiceContext* opCtx, const Timestamp& newTime); /** * Detects the current replication mode and sets the "_oplogCollectionName" accordingly. */ -void setOplogCollectionName(); +void setOplogCollectionName(ServiceContext* service); /** * Signal any waiting AwaitData queries on the oplog that there is new data or metadata available. diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp index 7666e026955..9a38b77d489 100644 --- a/src/mongo/db/repl/oplog_test.cpp +++ b/src/mongo/db/repl/oplog_test.cpp @@ -67,7 +67,7 @@ void OplogTest::setUp() { // Set up ReplicationCoordinator and create oplog. ReplicationCoordinator::set(service, stdx::make_unique<ReplicationCoordinatorMock>(service)); - setOplogCollectionName(); + setOplogCollectionName(service); createOplog(opCtx.get()); // Ensure that we are primary. diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index e2d28cadb6f..f9917d98177 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -167,7 +167,7 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); @@ -196,7 +196,7 @@ public: if (cmdObj["forShell"].trueValue()) LastError::get(opCtx->getClient()).disable(); - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); @@ -211,7 +211,8 @@ public: if (includeInitialSync) { responseStyle = ReplicationCoordinator::ReplSetGetStatusResponseStyle::kInitialSync; } - status = getGlobalReplicationCoordinator()->processReplSetGetStatus(&result, responseStyle); + status = + ReplicationCoordinator::get(opCtx)->processReplSetGetStatus(&result, responseStyle); return CommandHelpers::appendCommandStatus(result, status); } @@ -233,11 +234,11 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); - getGlobalReplicationCoordinator()->processReplSetGetConfig(&result); + ReplicationCoordinator::get(opCtx)->processReplSetGetConfig(&result); return true; } @@ -403,7 +404,7 @@ public: } Status status = - getGlobalReplicationCoordinator()->processReplSetInitiate(opCtx, configObj, &result); + ReplicationCoordinator::get(opCtx)->processReplSetInitiate(opCtx, configObj, &result); return CommandHelpers::appendCommandStatus(result, status); } @@ -425,7 +426,7 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) { return CommandHelpers::appendCommandStatus(result, status); } @@ -439,7 +440,7 @@ public: parsedArgs.newConfigObj = cmdObj["replSetReconfig"].Obj(); parsedArgs.force = cmdObj.hasField("force") && cmdObj["force"].trueValue(); status = - getGlobalReplicationCoordinator()->processReplSetReconfig(opCtx, parsedArgs, &result); + ReplicationCoordinator::get(opCtx)->processReplSetReconfig(opCtx, parsedArgs, &result); Lock::GlobalWrite globalWrite(opCtx); @@ -482,13 +483,13 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); int secs = (int)cmdObj.firstElement().numberInt(); return CommandHelpers::appendCommandStatus( - result, getGlobalReplicationCoordinator()->processReplSetFreeze(secs, &result)); + result, ReplicationCoordinator::get(opCtx)->processReplSetFreeze(secs, &result)); } private: @@ -512,7 +513,7 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); @@ -554,7 +555,7 @@ public: log() << "Attempting to step down in response to replSetStepDown command"; - status = getGlobalReplicationCoordinator()->stepDown( + status = ReplicationCoordinator::get(opCtx)->stepDown( opCtx, force, Seconds(secondaryCatchUpPeriodSecs), Seconds(stepDownForSecs)); return CommandHelpers::appendCommandStatus(result, status); } @@ -576,13 +577,13 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); return CommandHelpers::appendCommandStatus( result, - getGlobalReplicationCoordinator()->setMaintenanceMode( + ReplicationCoordinator::get(opCtx)->setMaintenanceMode( cmdObj["replSetMaintenance"].trueValue())); } @@ -604,7 +605,7 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); @@ -615,7 +616,7 @@ public: return CommandHelpers::appendCommandStatus( result, - getGlobalReplicationCoordinator()->processReplSetSyncFrom( + ReplicationCoordinator::get(opCtx)->processReplSetSyncFrom( opCtx, targetHostAndPort, &result)); } @@ -727,7 +728,7 @@ public: Status status = Status(ErrorCodes::InternalError, "status not set in heartbeat code"); /* we don't call ReplSetCommand::check() here because heartbeat checks many things that are pre-initialization. */ - if (!getGlobalReplicationCoordinator()->getSettings().usingReplSets()) { + if (!ReplicationCoordinator::get(opCtx)->getSettings().usingReplSets()) { status = Status(ErrorCodes::NoReplicationEnabled, "not running with --replSet"); return CommandHelpers::appendCommandStatus(result, status); } @@ -739,7 +740,7 @@ public: status = args.initialize(cmdObj); if (status.isOK()) { ReplSetHeartbeatResponse response; - status = getGlobalReplicationCoordinator()->processHeartbeatV1(args, &response); + status = ReplicationCoordinator::get(opCtx)->processHeartbeatV1(args, &response); if (status.isOK()) response.addToBSON(&result, true); @@ -764,7 +765,7 @@ public: } ReplSetHeartbeatResponse response; - status = getGlobalReplicationCoordinator()->processHeartbeat(args, &response); + status = ReplicationCoordinator::get(opCtx)->processHeartbeat(args, &response); if (status.isOK()) response.addToBSON(&result, false); @@ -785,7 +786,7 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); @@ -802,7 +803,7 @@ public: parsedArgs.cfgver = cfgverElement.safeNumberLong(); parsedArgs.opTime = Timestamp(cmdObj["opTime"].Date()); - status = getGlobalReplicationCoordinator()->processReplSetFresh(parsedArgs, &result); + status = ReplicationCoordinator::get(opCtx)->processReplSetFresh(parsedArgs, &result); return CommandHelpers::appendCommandStatus(result, status); } } cmdReplSetFresh; @@ -819,7 +820,7 @@ private: DEV log() << "received elect msg " << cmdObj.toString(); else LOG(2) << "received elect msg " << cmdObj.toString(); - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); @@ -835,7 +836,7 @@ private: parsedArgs.cfgver = cfgverElement.safeNumberLong(); parsedArgs.round = cmdObj["round"].OID(); - status = getGlobalReplicationCoordinator()->processReplSetElect(parsedArgs, &result); + status = ReplicationCoordinator::get(opCtx)->processReplSetElect(parsedArgs, &result); return CommandHelpers::appendCommandStatus(result, status); } } cmdReplSetElect; @@ -848,13 +849,13 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); log() << "Received replSetStepUp request"; - status = getGlobalReplicationCoordinator()->stepUpIfEligible(); + status = ReplicationCoordinator::get(opCtx)->stepUpIfEligible(); if (!status.isOK()) { log() << "replSetStepUp request failed" << causedBy(status); @@ -883,12 +884,12 @@ public: const string&, const BSONObj& cmdObj, BSONObjBuilder& result) override { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) return CommandHelpers::appendCommandStatus(result, status); log() << "Received replSetAbortPrimaryCatchUp request"; - status = getGlobalReplicationCoordinator()->abortCatchupIfNeeded(); + status = ReplicationCoordinator::get(opCtx)->abortCatchupIfNeeded(); if (!status.isOK()) { log() << "replSetAbortPrimaryCatchUp request failed" << causedBy(status); } diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp index 70b7c3d9b4b..1ceda277eb7 100644 --- a/src/mongo/db/repl/repl_set_request_votes.cpp +++ b/src/mongo/db/repl/repl_set_request_votes.cpp @@ -51,7 +51,7 @@ private: const std::string&, const BSONObj& cmdObj, BSONObjBuilder& result) final { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result); if (!status.isOK()) { return CommandHelpers::appendCommandStatus(result, status); } @@ -63,7 +63,7 @@ private: } ReplSetRequestVotesResponse response; - status = getGlobalReplicationCoordinator()->processReplSetRequestVotes( + status = ReplicationCoordinator::get(opCtx)->processReplSetRequestVotes( opCtx, parsedArgs, &response); response.addToBSON(&result); return CommandHelpers::appendCommandStatus(result, status); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 92cf503c0c3..421f71f48e1 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -233,7 +233,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( invariant(!_bgSync); log() << "Starting replication fetcher thread"; _bgSync = stdx::make_unique<BackgroundSync>( - this, _replicationProcess, makeSteadyStateOplogBuffer(opCtx)); + replCoord, this, _replicationProcess, makeSteadyStateOplogBuffer(opCtx)); _bgSync->startup(opCtx); log() << "Starting replication applier thread"; @@ -348,7 +348,7 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) // oplog. We record this update at the 'lastAppliedOpTime'. If there are any outstanding // checkpoints being taken, they should only reflect this write if they see all writes up // to our 'lastAppliedOpTime'. - auto lastAppliedOpTime = repl::getGlobalReplicationCoordinator()->getMyLastAppliedOpTime(); + auto lastAppliedOpTime = repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime(); _replicationProcess->getConsistencyMarkers()->clearAppliedThrough( opCtx, lastAppliedOpTime.getTimestamp()); } @@ -466,7 +466,7 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC // to our 'lastAppliedOpTime'. invariant( _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull()); - auto lastAppliedOpTime = repl::getGlobalReplicationCoordinator()->getMyLastAppliedOpTime(); + auto lastAppliedOpTime = repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime(); _replicationProcess->getConsistencyMarkers()->clearAppliedThrough( opCtx, lastAppliedOpTime.getTimestamp()); @@ -912,11 +912,11 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherMaxFetcherRe } JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken() { - return repl::getGlobalReplicationCoordinator()->getMyLastAppliedOpTime(); + return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTime(); } void ReplicationCoordinatorExternalStateImpl::onDurable(const JournalListener::Token& token) { - repl::getGlobalReplicationCoordinator()->setMyLastDurableOpTimeForward(token); + repl::ReplicationCoordinator::get(_service)->setMyLastDurableOpTimeForward(token); } void ReplicationCoordinatorExternalStateImpl::startNoopWriter(OpTime opTime) { diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index 2a772d192dd..b33a9e54d46 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -69,7 +69,7 @@ using std::stringstream; namespace repl { void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int level) { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + ReplicationCoordinator* replCoord = ReplicationCoordinator::get(opCtx); if (replCoord->getSettings().usingReplSets()) { IsMasterResponse isMasterResponse; replCoord->fillIsMasterForReplSet(&isMasterResponse); @@ -87,7 +87,7 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int result.append("info", s); } else { result.appendBool("ismaster", - getGlobalReplicationCoordinator()->isMasterForReportingPurposes()); + ReplicationCoordinator::get(opCtx)->isMasterForReportingPurposes()); } if (level) { @@ -162,7 +162,7 @@ public: } BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const { - if (!getGlobalReplicationCoordinator()->isReplEnabled()) { + if (!ReplicationCoordinator::get(opCtx)->isReplEnabled()) { return BSONObj(); } @@ -189,7 +189,7 @@ public: } BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + ReplicationCoordinator* replCoord = ReplicationCoordinator::get(opCtx); if (!replCoord->isReplEnabled()) { return BSONObj(); } diff --git a/src/mongo/db/repl/resync.cpp b/src/mongo/db/repl/resync.cpp index 13cc0ffbbd9..dbcdcbcb203 100644 --- a/src/mongo/db/repl/resync.cpp +++ b/src/mongo/db/repl/resync.cpp @@ -80,8 +80,8 @@ public: bool waitForResync = !cmdObj.hasField(kWaitFieldName) || cmdObj[kWaitFieldName].trueValue(); // Replica set resync. - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - if (getGlobalReplicationCoordinator()->getSettings().usingReplSets()) { + ReplicationCoordinator* replCoord = ReplicationCoordinator::get(opCtx); + if (replCoord->getSettings().usingReplSets()) { // Resync is disabled in production on replica sets until it stabilizes (SERVER-27081). if (!Command::testCommandsEnabled) { return CommandHelpers::appendCommandStatus( diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp index 2731343178f..ed38f2453b4 100644 --- a/src/mongo/db/repl/rollback_test_fixture.cpp +++ b/src/mongo/db/repl/rollback_test_fixture.cpp @@ -81,7 +81,7 @@ void RollbackTest::setUp() { _coordinator = new ReplicationCoordinatorRollbackMock(serviceContext); ReplicationCoordinator::set(serviceContext, std::unique_ptr<ReplicationCoordinator>(_coordinator)); - setOplogCollectionName(); + setOplogCollectionName(serviceContext); SessionCatalog::create(serviceContext); diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index f101cfe59d8..eec7a5ea456 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -70,7 +70,7 @@ void SyncTailTest::setUp() { StorageInterface::set(service, std::move(storageInterface)); DropPendingCollectionReaper::set( service, stdx::make_unique<DropPendingCollectionReaper>(_storageInterface)); - repl::setOplogCollectionName(); + repl::setOplogCollectionName(service); repl::createOplog(_opCtx.get()); service->setOpObserver(stdx::make_unique<OpObserverImpl>()); diff --git a/src/mongo/db/s/chunk_move_write_concern_options.cpp b/src/mongo/db/s/chunk_move_write_concern_options.cpp index 700f134a604..641571c2e2b 100644 --- a/src/mongo/db/s/chunk_move_write_concern_options.cpp +++ b/src/mongo/db/s/chunk_move_write_concern_options.cpp @@ -50,8 +50,8 @@ const WriteConcernOptions kWriteConcernLocal(1, WriteConcernOptions::SyncMode::NONE, WriteConcernOptions::kNoTimeout); -WriteConcernOptions getDefaultWriteConcernForMigration() { - repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator(); +WriteConcernOptions getDefaultWriteConcernForMigration(OperationContext* opCtx) { + repl::ReplicationCoordinator* replCoordinator = repl::ReplicationCoordinator::get(opCtx); if (replCoordinator->getReplicationMode() == mongo::repl::ReplicationCoordinator::modeReplSet) { Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(kDefaultWriteConcernForMigration); @@ -85,7 +85,7 @@ StatusWith<WriteConcernOptions> ChunkMoveWriteConcernOptions::getEffectiveWriteC if (options.isWriteConcernSpecified()) { writeConcern = options.getWriteConcern(); - repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator(); + repl::ReplicationCoordinator* replCoordinator = repl::ReplicationCoordinator::get(opCtx); if (replCoordinator->getReplicationMode() == repl::ReplicationCoordinator::modeMasterSlave && @@ -100,7 +100,7 @@ StatusWith<WriteConcernOptions> ChunkMoveWriteConcernOptions::getEffectiveWriteC return status; } } else { - writeConcern = getDefaultWriteConcernForMigration(); + writeConcern = getDefaultWriteConcernForMigration(opCtx); } if (writeConcern.shouldWaitForOtherNodes() && diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 669a3678862..50a038e26c4 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -575,7 +575,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, Lock::DBLock lk(opCtx, _nss.db(), MODE_X); OldClientWriteContext ctx(opCtx, _nss.ns()); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, _nss)) { setStateFailWarn(str::stream() << "Not primary during migration: " << _nss.ns() << ": checking if collection exists"); return; @@ -756,7 +756,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, if (writeConcern.shouldWaitForOtherNodes()) { repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::getGlobalReplicationCoordinator()->awaitReplication( + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( opCtx, repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), writeConcern); diff --git a/src/mongo/db/s/sharding_egress_metadata_hook_for_mongod.cpp b/src/mongo/db/s/sharding_egress_metadata_hook_for_mongod.cpp index 99603ce0ed4..b87497437bf 100644 --- a/src/mongo/db/s/sharding_egress_metadata_hook_for_mongod.cpp +++ b/src/mongo/db/s/sharding_egress_metadata_hook_for_mongod.cpp @@ -45,7 +45,8 @@ void ShardingEgressMetadataHookForMongod::_saveGLEStats(const BSONObj& metadata, repl::OpTime ShardingEgressMetadataHookForMongod::_getConfigServerOpTime() { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - return repl::getGlobalReplicationCoordinator()->getCurrentCommittedSnapshotOpTime(); + return repl::ReplicationCoordinator::get(_serviceContext) + ->getCurrentCommittedSnapshotOpTime(); } else { // TODO uncomment as part of SERVER-22663 // invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp index 0eb5d8ebaeb..bd8db9f6526 100644 --- a/src/mongo/db/ttl.cpp +++ b/src/mongo/db/ttl.cpp @@ -121,9 +121,9 @@ private: OperationContext& opCtx = *opCtxPtr; // If part of replSet but not in a readable state (e.g. during initial sync), skip. - if (repl::getGlobalReplicationCoordinator()->getReplicationMode() == + if (repl::ReplicationCoordinator::get(&opCtx)->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - !repl::getGlobalReplicationCoordinator()->getMemberState().readable()) + !repl::ReplicationCoordinator::get(&opCtx)->getMemberState().readable()) return; TTLCollectionCache& ttlCollectionCache = TTLCollectionCache::get(getGlobalServiceContext()); @@ -195,7 +195,7 @@ private: return; } - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, collectionNSS)) { + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, collectionNSS)) { return; } diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index bdf6e7a34b0..cf66f4a4299 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -120,7 +120,7 @@ public: getGlobalServiceContext()->setOpObserver(stdx::make_unique<OpObserverImpl>()); - setOplogCollectionName(); + setOplogCollectionName(getGlobalServiceContext()); createOplog(&_opCtx); OldClientWriteContext ctx(&_opCtx, ns()); diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 0380f161b89..0e04a261e92 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -98,7 +98,7 @@ public: getGlobalServiceContext()->setOpObserver(stdx::make_unique<OpObserverImpl>()); - repl::setOplogCollectionName(); + repl::setOplogCollectionName(getGlobalServiceContext()); repl::createOplog(_opCtx); ASSERT_OK(_clock->advanceClusterTime(LogicalTime(Timestamp(1, 0)))); diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp index df8dcd88f48..6bbef0dd426 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/s/sharding_mongod_test_fixture.cpp @@ -143,7 +143,7 @@ void ShardingMongodTestFixture::setUp() { repl::StorageInterface::set(service, std::move(storagePtr)); service->setOpObserver(stdx::make_unique<OpObserverImpl>()); - repl::setOplogCollectionName(); + repl::setOplogCollectionName(service); repl::createOplog(_opCtx.get()); // Set the highest FCV because otherwise it defaults to the lower FCV. This way we default to diff --git a/src/mongo/util/signal_handlers.cpp b/src/mongo/util/signal_handlers.cpp index 66ba2c04ed9..ed84a273b91 100644 --- a/src/mongo/util/signal_handlers.cpp +++ b/src/mongo/util/signal_handlers.cpp @@ -187,7 +187,7 @@ void signalProcessingThread(LogFileStatus rotate) { lastSignalTimeSeconds = signalTimeSeconds; fassert(16782, rotateLogs(serverGlobalParams.logRenameOnRotate)); if (rotate == LogFileStatus::kNeedToRotateLogFile) { - logProcessDetailsForLogRotate(); + logProcessDetailsForLogRotate(getGlobalServiceContext()); } break; default: |