diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 18 |
1 files changed, 8 insertions, 10 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 78141e4987e..851c91dac40 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -921,9 +921,8 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { return; } invariant(!_isCatchingUp); - invariant(!_canAcceptNonLocalWrites); _isWaitingForDrainToComplete = false; - _drainFinishedCond_forTest.notify_all(); + _drainFinishedCond.notify_all(); if (!_getMemberState_inlock().primary()) { // We must have decided not to transition to primary while waiting for the applier to drain. @@ -931,6 +930,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { return; } + invariant(!_canAcceptNonLocalWrites); _canAcceptNonLocalWrites = true; lk.unlock(); @@ -952,7 +952,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { stdx::unique_lock<stdx::mutex> lk(_mutex); auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; }; - if (!_drainFinishedCond_forTest.wait_for(lk, timeout.toSystemDuration(), pred)) { + if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, "Timed out waiting to finish draining applier buffer"); } @@ -2543,12 +2543,10 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { _replicationWaiterList.signalAndRemoveAll_inlock(); // Wake up the optime waiter that is waiting for primary catch-up to finish. _opTimeWaiterList.signalAndRemoveAll_inlock(); - // Clean up primary states. + // _isCatchingUp and _isWaitingForDrainToComplete could be cleaned up asynchronously + // by freshness scan. _canAcceptNonLocalWrites = false; - _isCatchingUp = false; - _isWaitingForDrainToComplete = false; _stepDownPending = false; - _drainFinishedCond_forTest.notify_all(); serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false); result = kActionCloseAllConnections; } else { @@ -2682,6 +2680,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto evhStatus = scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { + _finishCatchUpOplog_inlock(true); return; } fassertStatusOK(40254, evhStatus.getStatus()); @@ -2690,7 +2689,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { LockGuard lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { - _finishCatchUpOplog_inlock(false); + _finishCatchUpOplog_inlock(true); return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); @@ -2757,11 +2756,10 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca } void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) { + invariant(_isCatchingUp); _isCatchingUp = false; // If the node steps down during the catch-up, we don't go into drain mode. if (startToDrain) { - invariant(_getMemberState_inlock().primary()); - invariant(!_canAcceptNonLocalWrites); invariant(!_isWaitingForDrainToComplete); _isWaitingForDrainToComplete = true; // Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to |