diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 4b796ccd2ee..8925e576199 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -951,6 +951,11 @@ Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx, return Status::OK(); } +ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() { + stdx::lock_guard<Latch> lk(_mutex); + return _applierState; +} + void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, long long termWhenBufferIsEmpty) { // This logic is a little complicated in order to avoid acquiring the RSTL in mode X @@ -980,7 +985,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, invariant(opCtx->writesAreReplicated()); stdx::unique_lock<Latch> lk(_mutex); - if (_externalState->getApplierState() != OplogApplier::ApplierState::Draining) { + if (_applierState != ApplierState::Draining) { return; } lk.unlock(); @@ -996,11 +1001,11 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, // Exit drain mode only if we're actually in draining mode, the apply buffer is empty in the // current term, and we're allowed to become the write master. - if (_externalState->getApplierState() != OplogApplier::ApplierState::Draining || + if (_applierState != ApplierState::Draining || !_topCoord->canCompleteTransitionToPrimary(termWhenBufferIsEmpty)) { return; } - _externalState->setApplierState(OplogApplier::ApplierState::Stopped); + _applierState = ApplierState::Stopped; invariant(_getMemberState_inlock().primary()); invariant(!_readWriteAbility->canAcceptNonLocalWrites(opCtx)); @@ -1042,9 +1047,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { } stdx::unique_lock<Latch> lk(_mutex); - auto pred = [this]() { - return _externalState->getApplierState() != OplogApplier::ApplierState::Draining; - }; + auto pred = [this]() { return _applierState != ApplierState::Draining; }; if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, "Timed out waiting to finish draining applier buffer"); @@ -2844,7 +2847,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator(WithLock l _catchupState->abort_inlock(PrimaryCatchUpConclusionReason::kFailedWithError); } } - _externalState->setApplierState(OplogApplier::ApplierState::Running); + _applierState = ApplierState::Running; _externalState->startProducerIfStopped(); } @@ -3147,7 +3150,7 @@ boost::optional<Timestamp> ReplicationCoordinatorImpl::getRecoveryTimestamp() { } void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { - _externalState->setApplierState(OplogApplier::ApplierState::Draining); + _applierState = ApplierState::Draining; _externalState->stopProducer(); } |