diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 205 |
1 files changed, 103 insertions, 102 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 8ad99d7e24a..a238cee74de 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -411,8 +411,8 @@ void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolS _replExecutor.appendConnectionStats(stats); } -bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { - StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(txn); +bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) { + StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(opCtx); if (!lastVote.isOK()) { if (lastVote.getStatus() == ErrorCodes::NoMatchingDocument) { log() << "Did not find local voted for document at startup."; @@ -426,7 +426,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { _topCoord->loadLastVote(lastVote.getValue()); } - StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn); + StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(opCtx); if (!cfg.isOK()) { log() << "Did not find local replica set configuration document at startup; " << cfg.getStatus(); @@ -443,8 +443,8 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { } // Read the last op from the oplog after cleaning up any partially applied batches. - _externalState->cleanUpLastApplyBatch(txn); - auto lastOpTimeStatus = _externalState->loadLastOpTime(txn); + _externalState->cleanUpLastApplyBatch(opCtx); + auto lastOpTimeStatus = _externalState->loadLastOpTime(opCtx); // Use a callback here, because _finishLoadLocalConfig calls isself() which requires // that the server's networking layer be up and running and accepting connections, which @@ -546,12 +546,12 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( _performPostMemberStateUpdateAction(action); if (!isArbiter) { _externalState->startThreads(_settings); - invariant(cbData.txn); - _startDataReplication(cbData.txn); + invariant(cbData.opCtx); + _startDataReplication(cbData.opCtx); } } -void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) { +void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* opCtx) { std::shared_ptr<DataReplicator> drCopy; { LockGuard lk(_mutex); @@ -569,19 +569,20 @@ void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) { } LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling " "ReplCoordExtState::stopDataReplication."; - _externalState->stopDataReplication(txn); + _externalState->stopDataReplication(opCtx); } -void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn, +void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, stdx::function<void()> startCompleted) { // Check to see if we need to do an initial sync. const auto lastOpTime = getMyLastAppliedOpTime(); - const auto needsInitialSync = lastOpTime.isNull() || _externalState->isInitialSyncFlagSet(txn); + const auto needsInitialSync = + lastOpTime.isNull() || _externalState->isInitialSyncFlagSet(opCtx); if (!needsInitialSync) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_inShutdown) { // Start steady replication, since we already have data. - _externalState->startSteadyStateReplication(txn, this); + _externalState->startSteadyStateReplication(opCtx, this); } return; } @@ -624,9 +625,9 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn, startCompleted(); } // Repair local db (to compact it). - auto txn = cc().makeOperationContext(); - uassertStatusOK(_externalState->runRepairOnLocalDB(txn.get())); - _externalState->startSteadyStateReplication(txn.get(), this); + auto opCtx = cc().makeOperationContext(); + uassertStatusOK(_externalState->runRepairOnLocalDB(opCtx.get())); + _externalState->startSteadyStateReplication(opCtx.get(), this); }; std::shared_ptr<DataReplicator> drCopy; @@ -644,7 +645,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn, } // DataReplicator::startup() must be called outside lock because it uses features (eg. // setting the initial sync flag) which depend on the ReplicationCoordinatorImpl. - uassertStatusOK(drCopy->startup(txn, numInitialSyncAttempts.load())); + uassertStatusOK(drCopy->startup(opCtx, numInitialSyncAttempts.load())); } catch (...) { auto status = exceptionToStatus(); log() << "Initial Sync failed to start: " << status; @@ -655,19 +656,19 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn, fassertFailedWithStatusNoTrace(40354, status); } } else { - _externalState->startInitialSync([this, startCompleted](OperationContext* txn) { + _externalState->startInitialSync([this, startCompleted](OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_inShutdown) { if (startCompleted) { startCompleted(); } - _externalState->startSteadyStateReplication(txn, this); + _externalState->startSteadyStateReplication(opCtx, this); } }); } } -void ReplicationCoordinatorImpl::startup(OperationContext* txn) { +void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { if (!isReplEnabled()) { stdx::lock_guard<stdx::mutex> lk(_mutex); _setConfigState_inlock(kConfigReplicationDisabled); @@ -675,7 +676,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* txn) { } { - OID rid = _externalState->ensureMe(txn); + OID rid = _externalState->ensureMe(opCtx); stdx::lock_guard<stdx::mutex> lk(_mutex); fassert(18822, !_inShutdown); @@ -687,16 +688,16 @@ void ReplicationCoordinatorImpl::startup(OperationContext* txn) { if (!_settings.usingReplSets()) { // Must be Master/Slave invariant(_settings.isMaster() || _settings.isSlave()); - _externalState->startMasterSlave(txn); + _externalState->startMasterSlave(opCtx); return; } _replExecutor.startup(); _topCoord->setStorageEngineSupportsReadCommitted( - _externalState->isReadCommittedSupportedByStorageEngine(txn)); + _externalState->isReadCommittedSupportedByStorageEngine(opCtx)); - bool doneLoadingConfig = _startLoadLocalConfig(txn); + bool doneLoadingConfig = _startLoadLocalConfig(opCtx); if (doneLoadingConfig) { // If we're not done loading the config, then the config state will be set by // _finishLoadLocalConfig. @@ -706,7 +707,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* txn) { } } -void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { +void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { // Shutdown must: // * prevent new threads from blocking in awaitReplication // * wake up all existing threads blocking in awaitReplication @@ -759,7 +760,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { drCopy->join(); drCopy.reset(); } - _externalState->shutdown(txn); + _externalState->shutdown(opCtx); _replExecutor.shutdown(); _replExecutor.join(); } @@ -892,7 +893,7 @@ ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState return _applierState; } -void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn, +void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, long long termWhenBufferIsEmpty) { // This logic is a little complicated in order to avoid acquiring the global exclusive lock // unnecessarily. This is important because the applier may call signalDrainComplete() @@ -918,7 +919,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn, // temp collection isn't introduced on the new primary before we drop all the temp collections. // When we go to drop all temp collections, we must replicate the drops. - invariant(txn->writesAreReplicated()); + invariant(opCtx->writesAreReplicated()); stdx::unique_lock<stdx::mutex> lk(_mutex); if (_applierState != ApplierState::Draining) { @@ -926,7 +927,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn, } lk.unlock(); - _externalState->onDrainComplete(txn); + _externalState->onDrainComplete(opCtx); if (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) { log() << "transition to primary - " @@ -943,8 +944,8 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn, } } - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite globalWriteLock(txn->lockState()); + ScopedTransaction transaction(opCtx, MODE_X); + Lock::GlobalWrite globalWriteLock(opCtx->lockState()); lk.lock(); // Exit drain mode when the buffer is empty in the current term and we're in Draining mode. @@ -959,7 +960,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn, _canAcceptNonLocalWrites = true; lk.unlock(); - _setFirstOpTimeOfMyTerm(_externalState->onTransitionToPrimary(txn, isV1ElectionProtocol())); + _setFirstOpTimeOfMyTerm(_externalState->onTransitionToPrimary(opCtx, isV1ElectionProtocol())); lk.lock(); // Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged @@ -1232,11 +1233,11 @@ OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const { return _getMyLastDurableOpTime_inlock(); } -Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn, +Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCtx, const ReadConcernArgs& settings) { // We should never wait for replication if we are holding any locks, because this can // potentially block for long time while doing network activity. - if (txn->lockState()->isLocked()) { + if (opCtx->lockState()->isLocked()) { return {ErrorCodes::IllegalOperation, "Waiting for replication not allowed while holding a lock"}; } @@ -1291,10 +1292,10 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn, // If we are doing a majority read concern we only need to wait for a new snapshot. if (isMajorityReadConcern) { // Wait for a snapshot that meets our needs (< targetOpTime). - LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << txn->getDeadline(); + LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << opCtx->getDeadline(); auto waitStatus = - txn->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock); + opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock); if (!waitStatus.isOK()) { return waitStatus; } @@ -1305,12 +1306,12 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn, // We just need to wait for the opTime to catch up to what we need (not majority RC). stdx::condition_variable condVar; WaiterInfoGuard waitInfo( - &_opTimeWaiterList, txn->getOpID(), targetOpTime, nullptr, &condVar); + &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar); LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until " - << txn->getDeadline(); + << opCtx->getDeadline(); - auto waitStatus = txn->waitForConditionOrInterruptNoAssert(condVar, lock); + auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock); if (!waitStatus.isOK()) { return waitStatus; } @@ -1591,37 +1592,37 @@ bool ReplicationCoordinatorImpl::_haveTaggedNodesReachedOpTime_inlock( } ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication( - OperationContext* txn, const OpTime& opTime, const WriteConcernOptions& writeConcern) { + OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) { Timer timer; WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern); stdx::unique_lock<stdx::mutex> lock(_mutex); auto status = - _awaitReplication_inlock(&lock, txn, opTime, SnapshotName::min(), fixedWriteConcern); + _awaitReplication_inlock(&lock, opCtx, opTime, SnapshotName::min(), fixedWriteConcern); return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())}; } ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplicationOfLastOpForClient( - OperationContext* txn, const WriteConcernOptions& writeConcern) { + OperationContext* opCtx, const WriteConcernOptions& writeConcern) { Timer timer; WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern); stdx::unique_lock<stdx::mutex> lock(_mutex); - const auto& clientInfo = ReplClientInfo::forClient(txn->getClient()); + const auto& clientInfo = ReplClientInfo::forClient(opCtx->getClient()); auto status = _awaitReplication_inlock( - &lock, txn, clientInfo.getLastOp(), clientInfo.getLastSnapshot(), fixedWriteConcern); + &lock, opCtx, clientInfo.getLastOp(), clientInfo.getLastSnapshot(), fixedWriteConcern); return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())}; } Status ReplicationCoordinatorImpl::_awaitReplication_inlock( stdx::unique_lock<stdx::mutex>* lock, - OperationContext* txn, + OperationContext* opCtx, const OpTime& opTime, SnapshotName minSnapshot, const WriteConcernOptions& writeConcern) { // We should never wait for replication if we are holding any locks, because this can // potentially block for long time while doing network activity. - if (txn->lockState()->isLocked()) { + if (opCtx->lockState()->isLocked()) { return {ErrorCodes::IllegalOperation, "Waiting for replication not allowed while holding a lock"}; } @@ -1668,7 +1669,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( return stepdownStatus; } - auto interruptStatus = txn->checkForInterruptNoAssert(); + auto interruptStatus = opCtx->checkForInterruptNoAssert(); if (!interruptStatus.isOK()) { return interruptStatus; } @@ -1681,7 +1682,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( } } - auto clockSource = txn->getServiceContext()->getFastClockSource(); + auto clockSource = opCtx->getServiceContext()->getFastClockSource(); const auto wTimeoutDate = [&]() -> const Date_t { if (writeConcern.wDeadline != Date_t::max()) { return writeConcern.wDeadline; @@ -1696,14 +1697,14 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList stdx::condition_variable condVar; WaiterInfoGuard waitInfo( - &_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar); + &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar); while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) { if (_inShutdown) { return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"}; } - auto status = txn->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate); + auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate); if (!status.isOK()) { return status.getStatus(); } @@ -1729,7 +1730,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); } -Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, +Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, const bool force, const Milliseconds& waitTime, const Milliseconds& stepdownTime) { @@ -1745,12 +1746,12 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, return {ErrorCodes::NotMaster, "not primary so can't step down"}; } - Lock::GlobalLock globalReadLock(txn->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly()); + Lock::GlobalLock globalReadLock(opCtx->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly()); // We've requested the global shared lock which will stop new writes from coming in, // but existing writes could take a long time to finish, so kill all user operations // to help us get the global lock faster. - _externalState->killAllUserOperations(txn); + _externalState->killAllUserOperations(opCtx); globalReadLock.waitForLock(durationCount<Milliseconds>(stepdownTime)); @@ -1763,7 +1764,7 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, try { stdx::unique_lock<stdx::mutex> topoLock(_topoMutex); bool restartHeartbeats = true; - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); while (!_tryToStepDown(waitUntil, stepDownUntil, force)) { if (restartHeartbeats) { // We send out a fresh round of heartbeats because stepping down successfully @@ -1773,7 +1774,7 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, _restartHeartbeats_inlock(); restartHeartbeats = false; } - txn->waitForConditionOrInterruptUntil( + opCtx->waitForConditionOrInterruptUntil( _stepDownWaiters, topoLock, std::min(stepDownUntil, waitUntil)); } } catch (const DBException& ex) { @@ -1864,14 +1865,14 @@ bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() { return false; } -bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* txn, +bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* opCtx, StringData dbName) { // The answer isn't meaningful unless we hold the global lock. - invariant(txn->lockState()->isLocked()); - return canAcceptWritesForDatabase_UNSAFE(txn, dbName); + invariant(opCtx->lockState()->isLocked()); + return canAcceptWritesForDatabase_UNSAFE(opCtx, dbName); } -bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationContext* txn, +bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx, StringData dbName) { // _canAcceptNonLocalWrites is always true for standalone nodes, always false for nodes // started with --slave, and adjusted based on primary+drain state in replica sets. @@ -1889,32 +1890,32 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationCont return !replAllDead && _settings.isMaster(); } -bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* txn, +bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* opCtx, const NamespaceString& ns) { - invariant(txn->lockState()->isLocked()); - return canAcceptWritesFor_UNSAFE(txn, ns); + invariant(opCtx->lockState()->isLocked()); + return canAcceptWritesFor_UNSAFE(opCtx, ns); } -bool ReplicationCoordinatorImpl::canAcceptWritesFor_UNSAFE(OperationContext* txn, +bool ReplicationCoordinatorImpl::canAcceptWritesFor_UNSAFE(OperationContext* opCtx, const NamespaceString& ns) { if (_memberState.rollback() && ns.isOplog()) { return false; } StringData dbName = ns.db(); - return canAcceptWritesForDatabase_UNSAFE(txn, dbName); + return canAcceptWritesForDatabase_UNSAFE(opCtx, dbName); } -Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* txn, +Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* opCtx, const NamespaceString& ns, bool slaveOk) { - invariant(txn->lockState()->isLocked()); - return checkCanServeReadsFor_UNSAFE(txn, ns, slaveOk); + invariant(opCtx->lockState()->isLocked()); + return checkCanServeReadsFor_UNSAFE(opCtx, ns, slaveOk); } -Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext* txn, +Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext* opCtx, const NamespaceString& ns, bool slaveOk) { - auto client = txn->getClient(); + auto client = opCtx->getClient(); // Oplog reads are not allowed during STARTUP state, but we make an exception for internal // reads and master-slave replication. Internel reads are required for cleaning up unfinished // apply batches. Master-slave never sets the state so we make an exception for it as well. @@ -1928,7 +1929,7 @@ Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext if (client->isInDirectClient()) { return Status::OK(); } - if (canAcceptWritesFor_UNSAFE(txn, ns)) { + if (canAcceptWritesFor_UNSAFE(opCtx, ns)) { return Status::OK(); } if (_settings.isSlave() || _settings.isMaster()) { @@ -1948,9 +1949,9 @@ bool ReplicationCoordinatorImpl::isInPrimaryOrSecondaryState() const { return _canServeNonLocalReads.loadRelaxed(); } -bool ReplicationCoordinatorImpl::shouldRelaxIndexConstraints(OperationContext* txn, +bool ReplicationCoordinatorImpl::shouldRelaxIndexConstraints(OperationContext* opCtx, const NamespaceString& ns) { - return !canAcceptWritesFor(txn, ns); + return !canAcceptWritesFor(opCtx, ns); } OID ReplicationCoordinatorImpl::getElectionId() { @@ -1977,8 +1978,8 @@ int ReplicationCoordinatorImpl::_getMyId_inlock() const { return self.getId(); } -Status ReplicationCoordinatorImpl::resyncData(OperationContext* txn, bool waitUntilCompleted) { - _stopDataReplication(txn); +Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool waitUntilCompleted) { + _stopDataReplication(opCtx); auto finishedEvent = uassertStatusOK(_replExecutor.makeEvent()); stdx::function<void()> f; if (waitUntilCompleted) @@ -1987,7 +1988,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* txn, bool waitUn stdx::unique_lock<stdx::mutex> lk(_mutex); _resetMyLastOpTimes_inlock(); lk.unlock(); // unlock before calling into replCoordExtState. - _startDataReplication(txn, f); + _startDataReplication(opCtx, f); if (waitUntilCompleted) { _replExecutor.waitForEvent(finishedEvent); } @@ -2212,7 +2213,7 @@ Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) { return Status::OK(); } -Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* txn, +Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCtx, const HostAndPort& target, BSONObjBuilder* resultObj) { Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse"); @@ -2227,7 +2228,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* txn, } if (doResync) { - return resyncData(txn, false); + return resyncData(opCtx, false); } return result; @@ -2292,7 +2293,7 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& return result; } -Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn, +Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCtx, const ReplSetReconfigArgs& args, BSONObjBuilder* resultObj) { log() << "replSetReconfig admin command received from client"; @@ -2363,7 +2364,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn, } StatusWith<int> myIndex = validateConfigForReconfig( - _externalState.get(), oldConfig, newConfig, txn->getServiceContext(), args.force); + _externalState.get(), oldConfig, newConfig, opCtx->getServiceContext(), args.force); if (!myIndex.isOK()) { error() << "replSetReconfig got " << myIndex.getStatus() << " while validating " << newConfigObj; @@ -2382,7 +2383,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn, } } - status = _externalState->storeLocalConfigDocument(txn, newConfig.toBSON()); + status = _externalState->storeLocalConfigDocument(opCtx, newConfig.toBSON()); if (!status.isOK()) { error() << "replSetReconfig failed to store config document; " << status; return status; @@ -2465,7 +2466,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig( } } -Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, +Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCtx, const BSONObj& configObj, BSONObjBuilder* resultObj) { log() << "replSetInitiate admin command received from client"; @@ -2508,7 +2509,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, } StatusWith<int> myIndex = - validateConfigForInitiate(_externalState.get(), newConfig, txn->getServiceContext()); + validateConfigForInitiate(_externalState.get(), newConfig, opCtx->getServiceContext()); if (!myIndex.isOK()) { error() << "replSet initiate got " << myIndex.getStatus() << " while validating " << configObj; @@ -2525,7 +2526,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, return status; } - status = _externalState->initializeReplSetStorage(txn, newConfig.toBSON()); + status = _externalState->initializeReplSetStorage(opCtx, newConfig.toBSON()); if (!status.isOK()) { error() << "replSetInitiate failed to store config document or create the oplog; " << status; @@ -2545,7 +2546,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, // will fail validation with a "replSet initiate got ... while validating" reason. invariant(!newConfig.getMemberAt(myIndex.getValue()).isArbiter()); _externalState->startThreads(_settings); - _startDataReplication(txn); + _startDataReplication(opCtx); configStateGuard.Dismiss(); return Status::OK(); @@ -2949,7 +2950,7 @@ Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePosi return status; } -Status ReplicationCoordinatorImpl::processHandshake(OperationContext* txn, +Status ReplicationCoordinatorImpl::processHandshake(OperationContext* opCtx, const HandshakeArgs& handshake) { LOG(2) << "Received handshake " << handshake.toBSON(); @@ -2968,7 +2969,7 @@ Status ReplicationCoordinatorImpl::processHandshake(OperationContext* txn, SlaveInfo newSlaveInfo; newSlaveInfo.rid = handshake.getRid(); newSlaveInfo.memberId = -1; - newSlaveInfo.hostAndPort = _externalState->getClientHostAndPort(txn); + newSlaveInfo.hostAndPort = _externalState->getClientHostAndPort(opCtx); // Don't call _addSlaveInfo_inlock as that would wake sleepers unnecessarily. _slaveInfo.push_back(newSlaveInfo); @@ -3121,8 +3122,8 @@ void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Da host)); } -void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn) { - StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn); +void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx) { + StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(opCtx); OpTime lastOpTime; if (!lastOpTimeStatus.isOK()) { warning() << "Failed to load timestamp of most recently applied operation; " @@ -3137,7 +3138,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn _reportUpstream_inlock(std::move(lock)); // Unlocked below. - _externalState->setGlobalTimestamp(txn->getServiceContext(), lastOpTime.getTimestamp()); + _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTime.getTimestamp()); } bool ReplicationCoordinatorImpl::shouldChangeSyncSource( @@ -3244,14 +3245,14 @@ OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const { } Status ReplicationCoordinatorImpl::processReplSetRequestVotes( - OperationContext* txn, + OperationContext* opCtx, const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response) { if (!isV1ElectionProtocol()) { return {ErrorCodes::BadValue, "not using election protocol v1"}; } - auto termStatus = updateTerm(txn, args.getTerm()); + auto termStatus = updateTerm(opCtx, args.getTerm()); if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm) return termStatus; @@ -3264,7 +3265,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( if (!args.isADryRun() && response->getVoteGranted()) { LastVote lastVote{args.getTerm(), args.getCandidateIndex()}; - Status status = _externalState->storeLocalLastVoteDocument(txn, lastVote); + Status status = _externalState->storeLocalLastVoteDocument(opCtx, lastVote); if (!status.isOK()) { error() << "replSetRequestVotes failed to store LastVote document; " << status; return status; @@ -3405,7 +3406,7 @@ EventHandle ReplicationCoordinatorImpl::updateTerm_forTest( return finishEvh; } -Status ReplicationCoordinatorImpl::updateTerm(OperationContext* txn, long long term) { +Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long term) { // Term is only valid if we are replicating. if (getReplicationMode() != modeReplSet) { return {ErrorCodes::BadValue, "cannot supply 'term' without active replication"}; @@ -3417,7 +3418,7 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* txn, long long t } // Check we haven't acquired any lock, because potential stepdown needs global lock. - dassert(!txn->lockState()->isLocked()); + dassert(!opCtx->lockState()->isLocked()); TopologyCoordinator::UpdateTermResult updateTermResult; EventHandle finishEvh; @@ -3469,12 +3470,12 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback( return EventHandle(); } -SnapshotName ReplicationCoordinatorImpl::reserveSnapshotName(OperationContext* txn) { +SnapshotName ReplicationCoordinatorImpl::reserveSnapshotName(OperationContext* opCtx) { auto reservedName = SnapshotName(_snapshotNameGenerator.addAndFetch(1)); dassert(reservedName > SnapshotName::min()); dassert(reservedName < SnapshotName::max()); - if (txn) { - ReplClientInfo::forClient(txn->getClient()).setLastSnapshot(reservedName); + if (opCtx) { + ReplClientInfo::forClient(opCtx->getClient()).setLastSnapshot(reservedName); } return reservedName; } @@ -3483,12 +3484,12 @@ void ReplicationCoordinatorImpl::forceSnapshotCreation() { _externalState->forceSnapshotCreation(); } -void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* txn, +void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx, const SnapshotName& untilSnapshot) { stdx::unique_lock<stdx::mutex> lock(_mutex); while (!_currentCommittedSnapshot || _currentCommittedSnapshot->name < untilSnapshot) { - txn->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock); + opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock); } } @@ -3496,11 +3497,11 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { return _uncommittedSnapshotsSize.load(); } -void ReplicationCoordinatorImpl::createSnapshot(OperationContext* txn, +void ReplicationCoordinatorImpl::createSnapshot(OperationContext* opCtx, OpTime timeOfSnapshot, SnapshotName name) { stdx::lock_guard<stdx::mutex> lock(_mutex); - _externalState->createSnapshot(txn, name); + _externalState->createSnapshot(opCtx, name); auto snapshotInfo = SnapshotInfo{timeOfSnapshot, name}; if (timeOfSnapshot <= _lastCommittedOpTime) { @@ -3588,10 +3589,10 @@ EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgra if (cbData.status == ErrorCodes::CallbackCanceled) { return; } - invariant(cbData.txn); + invariant(cbData.opCtx); LastVote lastVote{OpTime::kInitialTerm, -1}; - auto status = _externalState->storeLocalLastVoteDocument(cbData.txn, lastVote); + auto status = _externalState->storeLocalLastVoteDocument(cbData.opCtx, lastVote); invariant(status.isOK()); _replExecutor.signalEvent(evh); }); |