diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-11-15 02:30:37 -0500 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-11-15 19:15:28 -0500 |
commit | 6904d0ac5ea4bba1822103eb4e7a623cc81de641 (patch) | |
tree | 6b1004234e10fb019ad4a2c466ddbb5260cfdd95 /src/mongo/db | |
parent | c91332d172d1a0f5325199a84150a1681f7fd863 (diff) | |
download | mongo-6904d0ac5ea4bba1822103eb4e7a623cc81de641.tar.gz |
SERVER-26403 Clean primary states on stepdown
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp | 10 |
3 files changed, 16 insertions, 16 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 3de98571a7c..526b03d9b38 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -921,8 +921,9 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { return; } invariant(!_isCatchingUp); + invariant(!_canAcceptNonLocalWrites); _isWaitingForDrainToComplete = false; - _drainFinishedCond.notify_all(); + _drainFinishedCond_forTest.notify_all(); if (!_getMemberState_inlock().primary()) { // We must have decided not to transition to primary while waiting for the applier to drain. @@ -930,7 +931,6 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { return; } - invariant(!_canAcceptNonLocalWrites); _canAcceptNonLocalWrites = true; lk.unlock(); @@ -952,7 +952,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { stdx::unique_lock<stdx::mutex> lk(_mutex); auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; }; - if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { + if (!_drainFinishedCond_forTest.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, "Timed out waiting to finish draining applier buffer"); } @@ -2516,9 +2516,11 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { _replicationWaiterList.signalAndRemoveAll_inlock(); // Wake up the optime waiter that is waiting for primary catch-up to finish. _opTimeWaiterList.signalAndRemoveAll_inlock(); - // _isCatchingUp and _isWaitingForDrainToComplete could be cleaned up asynchronously - // by freshness scan. + // Clean up primary states. _canAcceptNonLocalWrites = false; + _isCatchingUp = false; + _isWaitingForDrainToComplete = false; + _drainFinishedCond_forTest.notify_all(); serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false); result = kActionCloseAllConnections; } else { @@ -2652,7 +2654,6 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto evhStatus = scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { - _finishCatchUpOplog_inlock(true); return; } fassertStatusOK(40254, evhStatus.getStatus()); @@ -2661,7 +2662,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { LockGuard lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { - _finishCatchUpOplog_inlock(true); + _finishCatchUpOplog_inlock(false); return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); @@ -2728,10 +2729,11 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca } void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) { - invariant(_isCatchingUp); _isCatchingUp = false; // If the node steps down during the catch-up, we don't go into drain mode. if (startToDrain) { + invariant(_getMemberState_inlock().primary()); + invariant(!_canAcceptNonLocalWrites); invariant(!_isWaitingForDrainToComplete); _isWaitingForDrainToComplete = true; // Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index d2d95ea44aa..8597f9c21ce 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1209,8 +1209,8 @@ private: // Current ReplicaSet state. MemberState _memberState; // (MX) - // Used to signal threads waiting for changes to _memberState. - stdx::condition_variable _drainFinishedCond; // (M) + // Used to signal threads waiting for changes to _memberState. Only used in testing. + stdx::condition_variable _drainFinishedCond_forTest; // (M) // True if we are waiting for the applier to finish draining. bool _isWaitingForDrainToComplete; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 7bbe9000bc7..23b9df8e219 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -1348,11 +1348,12 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) { ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); + ASSERT_FALSE(getReplCoord()->isCatchingUp()); + ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); auto net = getNet(); net->enterNetwork(); net->runUntil(net->now() + config.getCatchUpTimeoutPeriod()); net->exitNetwork(); - ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary")); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test")); @@ -1377,15 +1378,12 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) { ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); + ASSERT_FALSE(getReplCoord()->isCatchingUp()); + ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); auto net = getNet(); net->enterNetwork(); net->runReadyNetworkOperations(); net->exitNetwork(); - auto txn = makeOperationContext(); - // Simulate bgsync signaling replCoord to exit drain mode. - // At this point, we see the stepdown and reset the states. - getReplCoord()->signalDrainComplete(txn.get()); - ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test")); |