diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_external_state_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 257 |
1 files changed, 129 insertions, 128 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 70c74cc8942..36d8a84fd06 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -195,46 +195,46 @@ ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl } ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {} -bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationContext* txn) { - return _storageInterface->getInitialSyncFlag(txn); +bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationContext* opCtx) { + return _storageInterface->getInitialSyncFlag(opCtx); } void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFinishedFn finished) { - _initialSyncRunner.schedule([finished, this](OperationContext* txn, const Status& status) { + _initialSyncRunner.schedule([finished, this](OperationContext* opCtx, const Status& status) { if (status == ErrorCodes::CallbackCanceled) { return TaskRunner::NextAction::kDisposeOperationContext; } // Do initial sync. - syncDoInitialSync(txn, this); - finished(txn); + syncDoInitialSync(opCtx, this); + finished(opCtx); return TaskRunner::NextAction::kDisposeOperationContext; }); } void ReplicationCoordinatorExternalStateImpl::runOnInitialSyncThread( - stdx::function<void(OperationContext* txn)> run) { + stdx::function<void(OperationContext* opCtx)> run) { _initialSyncRunner.cancel(); _initialSyncRunner.join(); - _initialSyncRunner.schedule([run, this](OperationContext* txn, const Status& status) { + _initialSyncRunner.schedule([run, this](OperationContext* opCtx, const Status& status) { if (status == ErrorCodes::CallbackCanceled) { return TaskRunner::NextAction::kDisposeOperationContext; } - invariant(txn); - invariant(txn->getClient()); - run(txn); + invariant(opCtx); + invariant(opCtx->getClient()); + run(opCtx); return TaskRunner::NextAction::kDisposeOperationContext; }); } void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( - OperationContext* txn, ReplicationCoordinator* replCoord) { + OperationContext* opCtx, ReplicationCoordinator* replCoord) { LockGuard lk(_threadMutex); invariant(replCoord); invariant(!_bgSync); log() << "Starting replication fetcher thread"; - _bgSync = stdx::make_unique<BackgroundSync>(this, makeSteadyStateOplogBuffer(txn)); - _bgSync->startup(txn); + _bgSync = stdx::make_unique<BackgroundSync>(this, makeSteadyStateOplogBuffer(opCtx)); + _bgSync->startup(opCtx); log() << "Starting replication applier thread"; invariant(!_applierThread); @@ -246,12 +246,12 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( &SyncSourceFeedback::run, &_syncSourceFeedback, _taskExecutor.get(), _bgSync.get()))); } -void ReplicationCoordinatorExternalStateImpl::stopDataReplication(OperationContext* txn) { +void ReplicationCoordinatorExternalStateImpl::stopDataReplication(OperationContext* opCtx) { UniqueLock lk(_threadMutex); - _stopDataReplication_inlock(txn, &lk); + _stopDataReplication_inlock(opCtx, &lk); } -void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(OperationContext* txn, +void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(OperationContext* opCtx, UniqueLock* lock) { // Make sue no other _stopDataReplication calls are in progress. _dataReplicationStopped.wait(*lock, [this]() { return !_stoppingDataReplication; }); @@ -270,7 +270,7 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(Operat if (oldBgSync) { log() << "Stopping replication fetcher thread"; - oldBgSync->shutdown(txn); + oldBgSync->shutdown(opCtx); } if (oldApplier) { @@ -279,7 +279,7 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(Operat } if (oldBgSync) { - oldBgSync->join(txn); + oldBgSync->join(opCtx); } _initialSyncRunner.cancel(); @@ -320,25 +320,25 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s _startedThreads = true; } -void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) { - repl::startMasterSlave(txn); +void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* opCtx) { + repl::startMasterSlave(opCtx); } -void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) { +void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) { UniqueLock lk(_threadMutex); if (_startedThreads) { - _stopDataReplication_inlock(txn, &lk); + _stopDataReplication_inlock(opCtx, &lk); if (_snapshotThread) { log() << "Stopping replication snapshot thread"; _snapshotThread->shutdown(); } - if (_storageInterface->getOplogDeleteFromPoint(txn).isNull() && - loadLastOpTime(txn) == _storageInterface->getAppliedThrough(txn)) { + if (_storageInterface->getOplogDeleteFromPoint(opCtx).isNull() && + loadLastOpTime(opCtx) == _storageInterface->getAppliedThrough(opCtx)) { // Clear the appliedThrough marker to indicate we are consistent with the top of the // oplog. - _storageInterface->setAppliedThrough(txn, {}); + _storageInterface->setAppliedThrough(opCtx, {}); } if (_noopWriter) { @@ -361,95 +361,95 @@ OldThreadPool* ReplicationCoordinatorExternalStateImpl::getDbWorkThreadPool() co return _writerPool.get(); } -Status ReplicationCoordinatorExternalStateImpl::runRepairOnLocalDB(OperationContext* txn) { +Status ReplicationCoordinatorExternalStateImpl::runRepairOnLocalDB(OperationContext* opCtx) { try { - ScopedTransaction scopedXact(txn, MODE_X); - Lock::GlobalWrite globalWrite(txn->lockState()); + ScopedTransaction scopedXact(opCtx, MODE_X); + Lock::GlobalWrite globalWrite(opCtx->lockState()); StorageEngine* engine = getGlobalServiceContext()->getGlobalStorageEngine(); if (!engine->isMmapV1()) { return Status::OK(); } - txn->setReplicatedWrites(false); - Status status = repairDatabase(txn, engine, localDbName, false, false); + opCtx->setReplicatedWrites(false); + Status status = repairDatabase(opCtx, engine, localDbName, false, false); // Open database before returning - dbHolder().openDb(txn, localDbName); + dbHolder().openDb(opCtx, localDbName); } catch (const DBException& ex) { return ex.toStatus(); } return Status::OK(); } -Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(OperationContext* txn, +Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config) { try { - createOplog(txn); + createOplog(opCtx); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction scopedXact(txn, MODE_X); - Lock::GlobalWrite globalWrite(txn->lockState()); + ScopedTransaction scopedXact(opCtx, MODE_X); + Lock::GlobalWrite globalWrite(opCtx->lockState()); - WriteUnitOfWork wuow(txn); - Helpers::putSingleton(txn, configCollectionName, config); + WriteUnitOfWork wuow(opCtx); + Helpers::putSingleton(opCtx, configCollectionName, config); const auto msgObj = BSON("msg" << "initiating set"); - getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, msgObj); + getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, msgObj); wuow.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs"); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "initiate oplog entry", "local.oplog.rs"); // This initializes the minvalid document with a null "ts" because older versions (<=3.2) // get angry if the minValid document is present but doesn't have a "ts" field. // Consider removing this once we no longer need to support downgrading to 3.2. - _storageInterface->setMinValidToAtLeast(txn, {}); + _storageInterface->setMinValidToAtLeast(opCtx, {}); - FeatureCompatibilityVersion::setIfCleanStartup(txn, _storageInterface); + FeatureCompatibilityVersion::setIfCleanStartup(opCtx, _storageInterface); } catch (const DBException& ex) { return ex.toStatus(); } return Status::OK(); } -void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); // If this is a config server node becoming a primary, ensure the balancer is ready to start. if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { // We must ensure the balancer has stopped because it may still be in the process of // stopping if this node was previously primary. - Balancer::get(txn)->waitForBalancerToStop(); + Balancer::get(opCtx)->waitForBalancerToStop(); } } -OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* txn, +OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* opCtx, bool isV1ElectionProtocol) { - invariant(txn->lockState()->isW()); + invariant(opCtx->lockState()->isW()); // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be // done before we add anything to our oplog. - invariant(_storageInterface->getOplogDeleteFromPoint(txn).isNull()); - _storageInterface->setAppliedThrough(txn, {}); + invariant(_storageInterface->getOplogDeleteFromPoint(opCtx).isNull()); + _storageInterface->setAppliedThrough(opCtx, {}); if (isV1ElectionProtocol) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction scopedXact(txn, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_X); - WriteUnitOfWork wuow(txn); - txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage( - txn, + WriteUnitOfWork wuow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage( + opCtx, BSON("msg" << "new primary")); wuow.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "logging transition to primary to oplog", "local.oplog.rs"); + opCtx, "logging transition to primary to oplog", "local.oplog.rs"); } - const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(txn)); + const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(opCtx)); - _shardingOnTransitionToPrimaryHook(txn); - _dropAllTempCollections(txn); + _shardingOnTransitionToPrimaryHook(opCtx); + _dropAllTempCollections(opCtx); serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(true); @@ -460,28 +460,28 @@ void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() { _syncSourceFeedback.forwardSlaveProgress(); } -OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) { +OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* opCtx) { std::string myname = getHostName(); OID myRID; { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lock(txn->lockState(), meDatabaseName, MODE_X); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock lock(opCtx->lockState(), meDatabaseName, MODE_X); BSONObj me; // local.me is an identifier for a server for getLastError w:2+ // TODO: handle WriteConflictExceptions below - if (!Helpers::getSingleton(txn, meCollectionName, me) || !me.hasField("host") || + if (!Helpers::getSingleton(opCtx, meCollectionName, me) || !me.hasField("host") || me["host"].String() != myname) { myRID = OID::gen(); // clean out local.me - Helpers::emptyCollection(txn, meCollectionName); + Helpers::emptyCollection(opCtx, meCollectionName); // repopulate BSONObjBuilder b; b.append("_id", myRID); b.append("host", myname); - Helpers::putSingleton(txn, meCollectionName, b.done()); + Helpers::putSingleton(opCtx, meCollectionName, b.done()); } else { myRID = me["_id"].OID(); } @@ -490,11 +490,11 @@ OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) { } StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument( - OperationContext* txn) { + OperationContext* opCtx) { try { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { BSONObj config; - if (!Helpers::getSingleton(txn, configCollectionName, config)) { + if (!Helpers::getSingleton(opCtx, configCollectionName, config)) { return StatusWith<BSONObj>( ErrorCodes::NoMatchingDocument, str::stream() << "Did not find replica set configuration document in " @@ -502,33 +502,33 @@ StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocu } return StatusWith<BSONObj>(config); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "load replica set config", configCollectionName); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "load replica set config", configCollectionName); } catch (const DBException& ex) { return StatusWith<BSONObj>(ex.toStatus()); } } -Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument(OperationContext* txn, +Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument(OperationContext* opCtx, const BSONObj& config) { try { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X); - Helpers::putSingleton(txn, configCollectionName, config); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock dbWriteLock(opCtx->lockState(), configDatabaseName, MODE_X); + Helpers::putSingleton(opCtx, configCollectionName, config); return Status::OK(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "save replica set config", configCollectionName); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "save replica set config", configCollectionName); } catch (const DBException& ex) { return ex.toStatus(); } } StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteDocument( - OperationContext* txn) { + OperationContext* opCtx) { try { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { BSONObj lastVoteObj; - if (!Helpers::getSingleton(txn, lastVoteCollectionName, lastVoteObj)) { + if (!Helpers::getSingleton(opCtx, lastVoteCollectionName, lastVoteObj)) { return StatusWith<LastVote>(ErrorCodes::NoMatchingDocument, str::stream() << "Did not find replica set lastVote document in " @@ -537,41 +537,41 @@ StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteD return LastVote::readFromLastVote(lastVoteObj); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "load replica set lastVote", lastVoteCollectionName); + opCtx, "load replica set lastVote", lastVoteCollectionName); } catch (const DBException& ex) { return StatusWith<LastVote>(ex.toStatus()); } } Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( - OperationContext* txn, const LastVote& lastVote) { + OperationContext* opCtx, const LastVote& lastVote) { BSONObj lastVoteObj = lastVote.toBSON(); try { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbWriteLock(txn->lockState(), lastVoteDatabaseName, MODE_X); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock dbWriteLock(opCtx->lockState(), lastVoteDatabaseName, MODE_X); // If there is no last vote document, we want to store one. Otherwise, we only want to // replace it if the new last vote document would have a higher term. We both check // the term of the current last vote document and insert the new document under the // DBLock to synchronize the two operations. BSONObj result; - bool exists = Helpers::getSingleton(txn, lastVoteCollectionName, result); + bool exists = Helpers::getSingleton(opCtx, lastVoteCollectionName, result); if (!exists) { - Helpers::putSingleton(txn, lastVoteCollectionName, lastVoteObj); + Helpers::putSingleton(opCtx, lastVoteCollectionName, lastVoteObj); } else { StatusWith<LastVote> oldLastVoteDoc = LastVote::readFromLastVote(result); if (!oldLastVoteDoc.isOK()) { return oldLastVoteDoc.getStatus(); } if (lastVote.getTerm() > oldLastVoteDoc.getValue().getTerm()) { - Helpers::putSingleton(txn, lastVoteCollectionName, lastVoteObj); + Helpers::putSingleton(opCtx, lastVoteCollectionName, lastVoteObj); } } } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "save replica set lastVote", lastVoteCollectionName); - txn->recoveryUnit()->waitUntilDurable(); + opCtx, "save replica set lastVote", lastVoteCollectionName); + opCtx->recoveryUnit()->waitUntilDurable(); return Status::OK(); } catch (const DBException& ex) { return ex.toStatus(); @@ -583,18 +583,18 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext* setNewTimestamp(ctx, newTime); } -void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* txn) { - if (_storageInterface->getInitialSyncFlag(txn)) { +void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* opCtx) { + if (_storageInterface->getInitialSyncFlag(opCtx)) { return; // Initial Sync will take over so no cleanup is needed. } // This initializes the minvalid document with a null "ts" because older versions (<=3.2) // get angry if the minValid document is present but doesn't have a "ts" field. // Consider removing this once we no longer need to support downgrading to 3.2. - _storageInterface->setMinValidToAtLeast(txn, {}); + _storageInterface->setMinValidToAtLeast(opCtx, {}); - const auto deleteFromPoint = _storageInterface->getOplogDeleteFromPoint(txn); - const auto appliedThrough = _storageInterface->getAppliedThrough(txn); + const auto deleteFromPoint = _storageInterface->getOplogDeleteFromPoint(opCtx); + const auto appliedThrough = _storageInterface->getAppliedThrough(opCtx); const bool needToDeleteEndOfOplog = !deleteFromPoint.isNull() && // This version should never have a non-null deleteFromPoint with a null appliedThrough. @@ -609,9 +609,9 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon !(appliedThrough.getTimestamp() >= deleteFromPoint); if (needToDeleteEndOfOplog) { log() << "Removing unapplied entries starting at: " << deleteFromPoint; - truncateOplogTo(txn, deleteFromPoint); + truncateOplogTo(opCtx, deleteFromPoint); } - _storageInterface->setOplogDeleteFromPoint(txn, {}); // clear the deleteFromPoint + _storageInterface->setOplogDeleteFromPoint(opCtx, {}); // clear the deleteFromPoint if (appliedThrough.isNull()) { // No follow-up work to do. @@ -620,7 +620,7 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon // Check if we have any unapplied ops in our oplog. It is important that this is done after // deleting the ragged end of the oplog. - const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(txn)); + const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(opCtx)); if (appliedThrough == topOfOplog) { return; // We've applied all the valid oplog we have. } else if (appliedThrough > topOfOplog) { @@ -632,7 +632,7 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to " << topOfOplog << " (inclusive)."; - DBDirectClient db(txn); + DBDirectClient db(opCtx); auto cursor = db.query(rsOplogName, QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())), /*batchSize*/ 0, @@ -658,28 +658,29 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon } // Apply remaining ops one at at time, but don't log them because they are already logged. - const bool wereWritesReplicated = txn->writesAreReplicated(); - ON_BLOCK_EXIT([&] { txn->setReplicatedWrites(wereWritesReplicated); }); - txn->setReplicatedWrites(false); + const bool wereWritesReplicated = opCtx->writesAreReplicated(); + ON_BLOCK_EXIT([&] { opCtx->setReplicatedWrites(wereWritesReplicated); }); + opCtx->setReplicatedWrites(false); while (cursor->more()) { auto entry = cursor->nextSafe(); - fassertStatusOK(40294, SyncTail::syncApply(txn, entry, true)); + fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true)); _storageInterface->setAppliedThrough( - txn, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); + opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); } } -StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) { +StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime( + OperationContext* opCtx) { // TODO: handle WriteConflictExceptions below try { // If we are doing an initial sync do not read from the oplog. - if (_storageInterface->getInitialSyncFlag(txn)) { + if (_storageInterface->getInitialSyncFlag(opCtx)) { return {ErrorCodes::InitialSyncFailure, "In the middle of an initial sync."}; } BSONObj oplogEntry; - if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) { + if (!Helpers::getLast(opCtx, rsOplogName.c_str(), oplogEntry)) { return StatusWith<OpTime>(ErrorCodes::NoMatchingDocument, str::stream() << "Did not find any entries in " << rsOplogName); @@ -711,17 +712,17 @@ bool ReplicationCoordinatorExternalStateImpl::isSelf(const HostAndPort& host, Se } HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( - const OperationContext* txn) { - return HostAndPort(txn->getClient()->clientAddress(true)); + const OperationContext* opCtx) { + return HostAndPort(opCtx->getClient()->clientAddress(true)); } void ReplicationCoordinatorExternalStateImpl::closeConnections() { getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen); } -void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { - ServiceContext* environment = txn->getServiceContext(); - environment->killAllUserOperations(txn, ErrorCodes::InterruptedDueToReplStateChange); +void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* opCtx) { + ServiceContext* environment = opCtx->getServiceContext(); + environment->killAllUserOperations(opCtx, ErrorCodes::InterruptedDueToReplStateChange); } void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { @@ -733,8 +734,8 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { } void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook( - OperationContext* txn) { - auto status = ShardingStateRecovery::recover(txn); + OperationContext* opCtx) { + auto status = ShardingStateRecovery::recover(opCtx); if (ErrorCodes::isShutdownError(status.code())) { // Note: callers of this method don't expect exceptions, so throw only unexpected fatal @@ -745,7 +746,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook fassertStatusOK(40107, status); if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - status = Grid::get(txn)->catalogManager()->initializeConfigDatabaseIfNeeded(txn); + status = Grid::get(opCtx)->catalogManager()->initializeConfigDatabaseIfNeeded(opCtx); if (!status.isOK() && status != ErrorCodes::AlreadyInitialized) { if (ErrorCodes::isShutdownError(status.code())) { // Don't fassert if we're mid-shutdown, let the shutdown happen gracefully. @@ -768,8 +769,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook // Since we *just* wrote the cluster ID to the config.version document (via // ShardingCatalogManager::initializeConfigDatabaseIfNeeded), this should always // succeed. - status = ClusterIdentityLoader::get(txn)->loadClusterId( - txn, repl::ReadConcernLevel::kLocalReadConcern); + status = ClusterIdentityLoader::get(opCtx)->loadClusterId( + opCtx, repl::ReadConcernLevel::kLocalReadConcern); if (ErrorCodes::isShutdownError(status.code())) { // Don't fassert if we're mid-shutdown, let the shutdown happen gracefully. @@ -780,20 +781,20 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook } // Free any leftover locks from previous instantiations. - auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager(); - distLockManager->unlockAll(txn, distLockManager->getProcessID()); + auto distLockManager = Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager(); + distLockManager->unlockAll(opCtx, distLockManager->getProcessID()); // If this is a config server node becoming a primary, start the balancer - Balancer::get(txn)->initiateBalancer(txn); + Balancer::get(opCtx)->initiateBalancer(opCtx); // Generate and upsert random 20 byte key for the LogicalClock's TimeProofService. // TODO: SERVER-27768 - } else if (ShardingState::get(txn)->enabled()) { + } else if (ShardingState::get(opCtx)->enabled()) { const auto configsvrConnStr = - Grid::get(txn)->shardRegistry()->getConfigShard()->getConnString(); - auto status = ShardingState::get(txn)->updateShardIdentityConfigString( - txn, configsvrConnStr.toString()); + Grid::get(opCtx)->shardRegistry()->getConfigShard()->getConnString(); + auto status = ShardingState::get(opCtx)->updateShardIdentityConfigString( + opCtx, configsvrConnStr.toString()); if (!status.isOK()) { warning() << "error encountered while trying to update config connection string to " << configsvrConnStr << causedBy(status); @@ -802,7 +803,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook // There is a slight chance that some stale metadata might have been loaded before the latest // optime has been recovered, so throw out everything that we have up to now - ShardingState::get(txn)->markCollectionsNotShardedAtStepdown(); + ShardingState::get(opCtx)->markCollectionsNotShardedAtStepdown(); } void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { @@ -826,7 +827,7 @@ void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() { } } -void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationContext* txn) { +void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationContext* opCtx) { std::vector<std::string> dbNames; StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); storageEngine->listDatabases(&dbNames); @@ -837,12 +838,12 @@ void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationC if (*it == "local") continue; LOG(2) << "Removing temporary collections from " << *it; - Database* db = dbHolder().get(txn, *it); + Database* db = dbHolder().get(opCtx, *it); // Since we must be holding the global lock during this function, if listDatabases // returned this dbname, we should be able to get a reference to it - it can't have // been dropped. invariant(db); - db->clearTmpCollections(txn); + db->clearTmpCollections(opCtx); } } @@ -857,11 +858,11 @@ void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotNa manager->setCommittedSnapshot(newCommitPoint); } -void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* txn, +void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* opCtx, SnapshotName name) { auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager(); invariant(manager); // This should never be called if there is no SnapshotManager. - manager->createSnapshot(txn, name); + manager->createSnapshot(opCtx, name); } void ReplicationCoordinatorExternalStateImpl::forceSnapshotCreation() { @@ -882,18 +883,18 @@ double ReplicationCoordinatorExternalStateImpl::getElectionTimeoutOffsetLimitFra } bool ReplicationCoordinatorExternalStateImpl::isReadCommittedSupportedByStorageEngine( - OperationContext* txn) const { - auto storageEngine = txn->getServiceContext()->getGlobalStorageEngine(); + OperationContext* opCtx) const { + auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine(); // This should never be called if the storage engine has not been initialized. invariant(storageEngine); return storageEngine->getSnapshotManager(); } StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply( - OperationContext* txn, + OperationContext* opCtx, MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) { - return repl::multiApply(txn, _writerPool.get(), std::move(ops), applyOperation); + return repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation); } Status ReplicationCoordinatorExternalStateImpl::multiSyncApply(MultiApplier::OperationPtrs* ops) { @@ -915,20 +916,20 @@ Status ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( } std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer( - OperationContext* txn) const { + OperationContext* opCtx) const { if (initialSyncOplogBuffer == kCollectionOplogBufferName) { invariant(initialSyncOplogBufferPeekCacheSize >= 0); OplogBufferCollection::Options options; options.peekCacheSize = std::size_t(initialSyncOplogBufferPeekCacheSize); return stdx::make_unique<OplogBufferProxy>( - stdx::make_unique<OplogBufferCollection>(StorageInterface::get(txn), options)); + stdx::make_unique<OplogBufferCollection>(StorageInterface::get(opCtx), options)); } else { return stdx::make_unique<OplogBufferBlockingQueue>(); } } std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeSteadyStateOplogBuffer( - OperationContext* txn) const { + OperationContext* opCtx) const { return stdx::make_unique<OplogBufferBlockingQueue>(); } |