diff options
author | Vesselina Ratcheva <vesselina.ratcheva@10gen.com> | 2018-06-21 13:18:09 -0400 |
---|---|---|
committer | Vesselina Ratcheva <vesselina.ratcheva@10gen.com> | 2018-07-02 21:03:07 -0400 |
commit | 925a113194e00e193318486f576d14e6c3e27ea1 (patch) | |
tree | 21e3e838e7d5a91b79a17c6dee2b45d4aedbe8e0 /src/mongo/db/repl | |
parent | f683298f0f85129b4eaf0c16244fe984943f42ce (diff) | |
download | mongo-925a113194e00e193318486f576d14e6c3e27ea1.tar.gz |
SERVER-35058 Do not rely only on heartbeats to signal secondary positions in the stepdown command
Diffstat (limited to 'src/mongo/db/repl')
5 files changed, 208 insertions, 77 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 35875f1bb33..f65975365a1 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -821,7 +821,6 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { _opTimeWaiterList.signalAll_inlock(); _currentCommittedSnapshotCond.notify_all(); _initialSyncer.swap(initialSyncerCopy); - _stepDownWaiters.notify_all(); } @@ -1605,9 +1604,14 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( } void ReplicationCoordinatorImpl::waitForStepDownAttempt_forTest() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_topCoord->isSteppingDown()) { - _stepDownWaiters.wait(lk); + auto isSteppingDown = [&]() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + // If true, we know that a stepdown is underway. + return (_topCoord->isSteppingDown()); + }; + + while (!isSteppingDown()) { + sleepFor(Milliseconds{10}); } } @@ -1662,9 +1666,6 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, return status; } - // Wake up threads blocked in waitForStepDownAttempt_forTest. - _stepDownWaiters.notify_all(); - // Update _canAcceptNonLocalWrites from the TopologyCoordinator now that we're in the middle // of a stepdown attempt. This will prevent us from accepting writes so that if our stepdown // attempt fails later we can release the global lock and go to sleep to allow secondaries to @@ -1705,21 +1706,23 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, try { - bool firstTime = true; - while (!_topCoord->attemptStepDown( - termAtStart, _replExecutor->now(), waitUntil, stepDownUntil, force)) { + auto waitTimeout = std::min(waitTime, stepdownTime); + auto lastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); - // The stepdown attempt failed. + // Set up a waiter which will be signalled when we process a heartbeat or updatePosition + // and have a majority of nodes at our optime. + stdx::condition_variable condVar; + const WriteConcernOptions waiterWriteConcern( + WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::NONE, waitTimeout); + ThreadWaiter waiter(lastAppliedOpTime, &waiterWriteConcern, &condVar); + WaiterGuard guard(&_replicationWaiterList, &waiter); - if (firstTime) { - // We send out a fresh round of heartbeats because stepping down successfully - // without {force: true} is dependent on timely heartbeat data. - _restartHeartbeats_inlock(); - firstTime = false; - } + while (!_topCoord->attemptStepDown( + termAtStart, _replExecutor->now(), waitUntil, stepDownUntil, force)) { - // Now release the global lock to allow secondaries to read the oplog, then wait until - // enough secondaries are caught up for us to finish stepdown. + // The stepdown attempt failed. We now release the global lock to allow secondaries + // to read the oplog, then wait until enough secondaries are caught up for us to + // finish stepdown. globalLock.reset(); invariant(!opCtx->lockState()->isLocked()); @@ -1748,7 +1751,7 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, // attemptStepDown again will cause attemptStepDown to return ExceededTimeLimit with // the proper error message. opCtx->waitForConditionOrInterruptUntil( - _stepDownWaiters, lk, std::min(stepDownUntil, waitUntil)); + condVar, lk, std::min(stepDownUntil, waitUntil)); } } catch (const DBException& e) { return e.toStatus(); @@ -1764,12 +1767,6 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, return Status::OK(); } -void ReplicationCoordinatorImpl::_signalStepDownWaiterIfReady_inlock() { - if (_topCoord->isSafeToStepDown()) { - _stepDownWaiters.notify_all(); - } -} - void ReplicationCoordinatorImpl::_handleTimePassing( const executor::TaskExecutor::CallbackArgs& cbData) { if (!cbData.status.isOK()) { @@ -2444,8 +2441,6 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock( _replicationWaiterList.signalAll_inlock(); // Wake up the optime waiter that is waiting for primary catch-up to finish. _opTimeWaiterList.signalAll_inlock(); - // If there are any pending stepdown command requests wake them up. - _stepDownWaiters.notify_all(); // _canAcceptNonLocalWrites should already be set above. invariant(!_canAcceptNonLocalWrites); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 35eca16fcb3..0416df74823 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -380,7 +380,7 @@ public: void waitForElectionDryRunFinish_forTest(); /** - * Waits until a stepdown command has begun. Callers should ensure that the stepdown attempt + * Waits until a stepdown attempt has begun. Callers should ensure that the stepdown attempt * won't fully complete before this method is called, or this method may never return. */ void waitForStepDownAttempt_forTest(); @@ -615,13 +615,6 @@ private: Status _checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions& writeConcern) const; - /** - * Wakes up threads in the process of handling a stepdown request based on whether the - * TopologyCoordinator now believes enough secondaries are caught up for the stepdown request to - * complete. - */ - void _signalStepDownWaiterIfReady_inlock(); - bool _canAcceptWritesFor_inlock(const NamespaceString& ns); int _getMyId_inlock() const; @@ -1170,9 +1163,6 @@ private: // This member's index position in the current config. int _selfIndex; // (M) - // Condition to signal when new heartbeat data comes in. - stdx::condition_variable _stepDownWaiters; // (M) - std::unique_ptr<VoteRequester> _voteRequester; // (M) // Event that the election code will signal when the in-progress election completes. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 9ea9d13542f..d1269f78d1b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -212,9 +212,6 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( _updateLastCommittedOpTime_inlock(); } - // Wake the stepdown waiter when our updated OpTime allows it to finish stepping down. - _signalStepDownWaiterIfReady_inlock(); - // Abort catchup if we have caught up to the latest known optime after heartbeat refreshing. if (_catchupState) { _catchupState->signalHeartbeatUpdate_inlock(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index a40e0c6a015..538f8c4693d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -37,6 +37,7 @@ #include <vector> #include "mongo/bson/util/bson_extract.h" +#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/bson_extract_optime.h" #include "mongo/db/repl/is_master_response.h" @@ -1613,6 +1614,172 @@ TEST_F(ReplCoordTest, DrainCompletionMidStepDown) { // ASSERT_EQUALS(2, getReplCoord()->getTerm()); // SERVER-28290 } +TEST_F(StepDownTest, StepDownCanCompleteBasedOnReplSetUpdatePositionAlone) { + const auto repl = getReplCoord(); + + OpTimeWithTermOne opTime1(100, 1); + OpTimeWithTermOne opTime2(200, 1); + + repl->setMyLastAppliedOpTime(opTime2); + repl->setMyLastDurableOpTime(opTime2); + + // Secondaries not caught up yet. + ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1)); + ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, opTime1)); + + simulateSuccessfulV1Election(); + ASSERT_TRUE(repl->getMemberState().primary()); + + // Step down where the secondary actually has to catch up before the stepDown can succeed. + auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); + + // The node has not been able to step down yet. + ASSERT_TRUE(repl->getMemberState().primary()); + + // Catch up one of the secondaries using only replSetUpdatePosition. + long long configVersion = repl->getConfig().getConfigVersion(); + UpdatePositionArgs updatePositionArgs; + + ASSERT_OK(updatePositionArgs.initialize( + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << configVersion + << UpdatePositionArgs::kMemberIdFieldName + << 1 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableOpTimeFieldName + << opTime2.asOpTime().toBSON()) + << BSON(UpdatePositionArgs::kConfigVersionFieldName + << configVersion + << UpdatePositionArgs::kMemberIdFieldName + << 2 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << opTime1.asOpTime().toBSON() + << UpdatePositionArgs::kDurableOpTimeFieldName + << opTime1.asOpTime().toBSON()))))); + + ASSERT_OK(repl->processReplSetUpdatePosition(updatePositionArgs, &configVersion)); + + // Verify that stepDown completes successfully. + ASSERT_OK(*result.second.get()); + ASSERT_TRUE(repl->getMemberState().secondary()); +} + +class StepDownTestWithUnelectableNode : public StepDownTest { +private: + void setUp() override { + ReplCoordTest::setUp(); + init("mySet/test1:1234,test2:1234,test3:1234"); + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 1 + << "protocolVersion" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "test1:1234") + << BSON("_id" << 1 << "host" + << "test2:1234" + << "priority" + << 0) + << BSON("_id" << 2 << "host" + << "test3:1234"))), + HostAndPort("test1", 1234)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + } +}; + +TEST_F(StepDownTestWithUnelectableNode, + UpdatePositionDuringStepDownWakesUpStepDownWaiterMoreThanOnce) { + const auto repl = getReplCoord(); + + OpTimeWithTermOne opTime1(100, 1); + OpTimeWithTermOne opTime2(200, 1); + + repl->setMyLastAppliedOpTime(opTime2); + repl->setMyLastDurableOpTime(opTime2); + + // No secondaries are caught up yet. + ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1)); + ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, opTime1)); + + simulateSuccessfulV1Election(); + ASSERT_TRUE(repl->getMemberState().primary()); + + // Step down where the secondary actually has to catch up before the stepDown can succeed. + auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); + + // The node has not been able to step down yet. + ASSERT_TRUE(repl->getMemberState().primary()); + + // Use replSetUpdatePosition to catch up the first secondary, which is not electable. + // This will yield a majority at the primary's opTime, so the waiter will be woken up, + // but stepDown will not be able to complete. + long long configVersion = repl->getConfig().getConfigVersion(); + UpdatePositionArgs catchupFirstSecondary; + + ASSERT_OK(catchupFirstSecondary.initialize( + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << configVersion + << UpdatePositionArgs::kMemberIdFieldName + << 1 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableOpTimeFieldName + << opTime2.asOpTime().toBSON()) + << BSON(UpdatePositionArgs::kConfigVersionFieldName + << configVersion + << UpdatePositionArgs::kMemberIdFieldName + << 2 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << opTime1.asOpTime().toBSON() + << UpdatePositionArgs::kDurableOpTimeFieldName + << opTime1.asOpTime().toBSON()))))); + + ASSERT_OK(repl->processReplSetUpdatePosition(catchupFirstSecondary, &configVersion)); + + // The primary has still not been able to finish stepping down. + ASSERT_TRUE(repl->getMemberState().primary()); + + // Now catch up the other secondary. This will wake up the waiter again, but this time + // there is an electable node, so stepDown will complete. + UpdatePositionArgs catchupOtherSecondary; + + ASSERT_OK(catchupOtherSecondary.initialize( + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << configVersion + << UpdatePositionArgs::kMemberIdFieldName + << 1 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableOpTimeFieldName + << opTime2.asOpTime().toBSON()) + << BSON(UpdatePositionArgs::kConfigVersionFieldName + << configVersion + << UpdatePositionArgs::kMemberIdFieldName + << 2 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableOpTimeFieldName + << opTime2.asOpTime().toBSON()))))); + + ASSERT_OK(repl->processReplSetUpdatePosition(catchupOtherSecondary, &configVersion)); + + // Verify that stepDown completes successfully. + ASSERT_OK(*result.second.get()); + ASSERT_TRUE(repl->getMemberState().secondary()); +} + TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { const auto opCtx = makeOperationContext(); @@ -1631,23 +1798,32 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { TEST_F(StepDownTest, NodeReturnsExceededTimeLimitWhenStepDownFailsToObtainTheGlobalLockWithinTheAllottedTime) { OpTimeWithTermOne optime1(100, 1); - // All nodes are caught up + + // Set up this test so that all nodes are caught up. This is necessary to exclude the false + // positive case where stepDown returns "ExceededTimeLimit", but not because it could not + // acquire the lock, but because it could not satisfy all stepdown conditions on time. getReplCoord()->setMyLastAppliedOpTime(optime1); getReplCoord()->setMyLastDurableOpTime(optime1); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1)); simulateSuccessfulV1Election(); + ASSERT_TRUE(getReplCoord()->getMemberState().primary()); const auto opCtx = makeOperationContext(); - // Make sure stepDown cannot grab the global shared lock + // Make sure stepDown cannot grab the global exclusive lock. We need to use a different + // locker to test this, or otherwise stepDown will be granted the lock automatically. Lock::GlobalWrite lk(opCtx.get()); + ASSERT_TRUE(opCtx->lockState()->isW()); + auto locker = opCtx.get()->swapLockState(stdx::make_unique<DefaultLockerImpl>()); Status status = getReplCoord()->stepDown(opCtx.get(), false, Milliseconds(0), Milliseconds(1000)); ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); + + opCtx.get()->swapLockState(std::move(locker)); } /* Step Down Test for a 5-node replica set */ @@ -1988,10 +2164,6 @@ TEST_F(StepDownTest, ASSERT_TRUE(getReplCoord()->getMemberState().primary()); // Step down where the secondary actually has to catch up before the stepDown can succeed. - // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for - // T + 2 seconds and send out a new round of heartbeats immediately. - // This makes it unnecessary to advance the clock after entering the network to process - // the heartbeat requests. auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); catchUpSecondaries(optime2); @@ -2015,15 +2187,12 @@ TEST_F(StepDownTest, simulateSuccessfulV1Election(); // Step down where the secondary actually has to catch up before the stepDown can succeed. - // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for - // T + 2 seconds and send out a new round of heartbeats immediately. - // This makes it unnecessary to advance the clock after entering the network to process - // the heartbeat requests. auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); - // Secondary has not caught up on first round of heartbeats. + // Advance the clock by two seconds to allow for a round of heartbeats to be sent. The + // secondary will not appear to be caught up. enterNetwork(); - getNet()->runUntil(getNet()->now() + Milliseconds(1000)); + getNet()->runUntil(getNet()->now() + Milliseconds(2000)); NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); RemoteCommandRequest request = noi->getRequest(); log() << "HB1: " << request.target.toString() << " processing " << request.cmdObj; @@ -2087,10 +2256,6 @@ TEST_F(StepDownTest, OnlyOneStepDownCmdIsAllowedAtATime) { ASSERT_TRUE(getReplCoord()->getMemberState().primary()); // Step down where the secondary actually has to catch up before the stepDown can succeed. - // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for - // T + 2 seconds and send out a new round of heartbeats immediately. - // This makes it unnecessary to advance the clock after entering the network to process - // the heartbeat requests. auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); // We should still be primary at this point @@ -2126,12 +2291,6 @@ TEST_F(StepDownTest, UnconditionalStepDownFailsStepDownCommand) { ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - // Step down where the secondary actually has to catch up before the stepDown can succeed. - // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for - // T + 2 seconds and send out a new round of heartbeats immediately. - // This makes it unnecessary to advance the clock after entering the network to process - // the heartbeat requests. - // Start a stepdown command that needs to wait for secondaries to catch up. auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); @@ -2166,12 +2325,6 @@ TEST_F(StepDownTest, InterruptingStepDownCommandRestoresWriteAvailability) { ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - // Step down where the secondary actually has to catch up before the stepDown can succeed. - // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for - // T + 2 seconds and send out a new round of heartbeats immediately. - // This makes it unnecessary to advance the clock after entering the network to process - // the heartbeat requests. - // Start a stepdown command that needs to wait for secondaries to catch up. auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); @@ -2213,12 +2366,6 @@ TEST_F(StepDownTest, InterruptingAfterUnconditionalStepdownDoesNotRestoreWriteAv ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - // Step down where the secondary actually has to catch up before the stepDown can succeed. - // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for - // T + 2 seconds and send out a new round of heartbeats immediately. - // This makes it unnecessary to advance the clock after entering the network to process - // the heartbeat requests. - // Start a stepdown command that needs to wait for secondaries to catch up. auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 71ae073c80d..55f09997f7e 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -2334,7 +2334,9 @@ bool TopologyCoordinator::isSafeToStepDown() { continue; } UnelectableReasonMask reason = _getUnelectableReason(memberIndex); - if (!reason && _memberData.at(memberIndex).getHeartbeatAppliedOpTime() >= lastOpApplied) { + auto memberData = _memberData.at(memberIndex); + bool caughtUp = (memberData.getLastAppliedOpTime() >= lastOpApplied); + if (!reason && caughtUp) { // Found a caught up and electable node, succeed with step down. return true; } |