diff options
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp | 10 |
3 files changed, 16 insertions, 16 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 78141e4987e..851c91dac40 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -921,9 +921,8 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { return; } invariant(!_isCatchingUp); - invariant(!_canAcceptNonLocalWrites); _isWaitingForDrainToComplete = false; - _drainFinishedCond_forTest.notify_all(); + _drainFinishedCond.notify_all(); if (!_getMemberState_inlock().primary()) { // We must have decided not to transition to primary while waiting for the applier to drain. @@ -931,6 +930,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { return; } + invariant(!_canAcceptNonLocalWrites); _canAcceptNonLocalWrites = true; lk.unlock(); @@ -952,7 +952,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { stdx::unique_lock<stdx::mutex> lk(_mutex); auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; }; - if (!_drainFinishedCond_forTest.wait_for(lk, timeout.toSystemDuration(), pred)) { + if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, "Timed out waiting to finish draining applier buffer"); } @@ -2543,12 +2543,10 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { _replicationWaiterList.signalAndRemoveAll_inlock(); // Wake up the optime waiter that is waiting for primary catch-up to finish. _opTimeWaiterList.signalAndRemoveAll_inlock(); - // Clean up primary states. + // _isCatchingUp and _isWaitingForDrainToComplete could be cleaned up asynchronously + // by freshness scan. _canAcceptNonLocalWrites = false; - _isCatchingUp = false; - _isWaitingForDrainToComplete = false; _stepDownPending = false; - _drainFinishedCond_forTest.notify_all(); serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false); result = kActionCloseAllConnections; } else { @@ -2682,6 +2680,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto evhStatus = scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { + _finishCatchUpOplog_inlock(true); return; } fassertStatusOK(40254, evhStatus.getStatus()); @@ -2690,7 +2689,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { LockGuard lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { - _finishCatchUpOplog_inlock(false); + _finishCatchUpOplog_inlock(true); return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); @@ -2757,11 +2756,10 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca } void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) { + invariant(_isCatchingUp); _isCatchingUp = false; // If the node steps down during the catch-up, we don't go into drain mode. if (startToDrain) { - invariant(_getMemberState_inlock().primary()); - invariant(!_canAcceptNonLocalWrites); invariant(!_isWaitingForDrainToComplete); _isWaitingForDrainToComplete = true; // Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 25e448500fd..14b9571b7f4 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1222,8 +1222,8 @@ private: // Current ReplicaSet state. MemberState _memberState; // (MX) - // Used to signal threads waiting for changes to _memberState. Only used in testing. - stdx::condition_variable _drainFinishedCond_forTest; // (M) + // Used to signal threads waiting for changes to _memberState. + stdx::condition_variable _drainFinishedCond; // (M) // True if we are waiting for the applier to finish draining. bool _isWaitingForDrainToComplete; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 23b9df8e219..7bbe9000bc7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -1348,12 +1348,11 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) { ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - ASSERT_FALSE(getReplCoord()->isCatchingUp()); - ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); auto net = getNet(); net->enterNetwork(); net->runUntil(net->now() + config.getCatchUpTimeoutPeriod()); net->exitNetwork(); + ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary")); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test")); @@ -1378,12 +1377,15 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) { ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - ASSERT_FALSE(getReplCoord()->isCatchingUp()); - ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); auto net = getNet(); net->enterNetwork(); net->runReadyNetworkOperations(); net->exitNetwork(); + auto txn = makeOperationContext(); + // Simulate bgsync signaling replCoord to exit drain mode. + // At this point, we see the stepdown and reset the states. + getReplCoord()->signalDrainComplete(txn.get()); + ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test")); |