summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorVesselina Ratcheva <vesselina.ratcheva@10gen.com>2018-06-21 13:18:09 -0400
committerVesselina Ratcheva <vesselina.ratcheva@10gen.com>2018-07-02 21:03:07 -0400
commit925a113194e00e193318486f576d14e6c3e27ea1 (patch)
tree21e3e838e7d5a91b79a17c6dee2b45d4aedbe8e0 /src/mongo/db/repl
parentf683298f0f85129b4eaf0c16244fe984943f42ce (diff)
downloadmongo-925a113194e00e193318486f576d14e6c3e27ea1.tar.gz
SERVER-35058 Do not rely only on heartbeats to signal secondary positions in the stepdown command
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp51
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h12
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp215
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp4
5 files changed, 208 insertions, 77 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 35875f1bb33..f65975365a1 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -821,7 +821,6 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
_opTimeWaiterList.signalAll_inlock();
_currentCommittedSnapshotCond.notify_all();
_initialSyncer.swap(initialSyncerCopy);
- _stepDownWaiters.notify_all();
}
@@ -1605,9 +1604,14 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
}
void ReplicationCoordinatorImpl::waitForStepDownAttempt_forTest() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_topCoord->isSteppingDown()) {
- _stepDownWaiters.wait(lk);
+ auto isSteppingDown = [&]() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ // If true, we know that a stepdown is underway.
+ return (_topCoord->isSteppingDown());
+ };
+
+ while (!isSteppingDown()) {
+ sleepFor(Milliseconds{10});
}
}
@@ -1662,9 +1666,6 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
return status;
}
- // Wake up threads blocked in waitForStepDownAttempt_forTest.
- _stepDownWaiters.notify_all();
-
// Update _canAcceptNonLocalWrites from the TopologyCoordinator now that we're in the middle
// of a stepdown attempt. This will prevent us from accepting writes so that if our stepdown
// attempt fails later we can release the global lock and go to sleep to allow secondaries to
@@ -1705,21 +1706,23 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
try {
- bool firstTime = true;
- while (!_topCoord->attemptStepDown(
- termAtStart, _replExecutor->now(), waitUntil, stepDownUntil, force)) {
+ auto waitTimeout = std::min(waitTime, stepdownTime);
+ auto lastAppliedOpTime = _getMyLastAppliedOpTime_inlock();
- // The stepdown attempt failed.
+ // Set up a waiter which will be signalled when we process a heartbeat or updatePosition
+ // and have a majority of nodes at our optime.
+ stdx::condition_variable condVar;
+ const WriteConcernOptions waiterWriteConcern(
+ WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::NONE, waitTimeout);
+ ThreadWaiter waiter(lastAppliedOpTime, &waiterWriteConcern, &condVar);
+ WaiterGuard guard(&_replicationWaiterList, &waiter);
- if (firstTime) {
- // We send out a fresh round of heartbeats because stepping down successfully
- // without {force: true} is dependent on timely heartbeat data.
- _restartHeartbeats_inlock();
- firstTime = false;
- }
+ while (!_topCoord->attemptStepDown(
+ termAtStart, _replExecutor->now(), waitUntil, stepDownUntil, force)) {
- // Now release the global lock to allow secondaries to read the oplog, then wait until
- // enough secondaries are caught up for us to finish stepdown.
+ // The stepdown attempt failed. We now release the global lock to allow secondaries
+ // to read the oplog, then wait until enough secondaries are caught up for us to
+ // finish stepdown.
globalLock.reset();
invariant(!opCtx->lockState()->isLocked());
@@ -1748,7 +1751,7 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
// attemptStepDown again will cause attemptStepDown to return ExceededTimeLimit with
// the proper error message.
opCtx->waitForConditionOrInterruptUntil(
- _stepDownWaiters, lk, std::min(stepDownUntil, waitUntil));
+ condVar, lk, std::min(stepDownUntil, waitUntil));
}
} catch (const DBException& e) {
return e.toStatus();
@@ -1764,12 +1767,6 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
return Status::OK();
}
-void ReplicationCoordinatorImpl::_signalStepDownWaiterIfReady_inlock() {
- if (_topCoord->isSafeToStepDown()) {
- _stepDownWaiters.notify_all();
- }
-}
-
void ReplicationCoordinatorImpl::_handleTimePassing(
const executor::TaskExecutor::CallbackArgs& cbData) {
if (!cbData.status.isOK()) {
@@ -2444,8 +2441,6 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock(
_replicationWaiterList.signalAll_inlock();
// Wake up the optime waiter that is waiting for primary catch-up to finish.
_opTimeWaiterList.signalAll_inlock();
- // If there are any pending stepdown command requests wake them up.
- _stepDownWaiters.notify_all();
// _canAcceptNonLocalWrites should already be set above.
invariant(!_canAcceptNonLocalWrites);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 35eca16fcb3..0416df74823 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -380,7 +380,7 @@ public:
void waitForElectionDryRunFinish_forTest();
/**
- * Waits until a stepdown command has begun. Callers should ensure that the stepdown attempt
+ * Waits until a stepdown attempt has begun. Callers should ensure that the stepdown attempt
* won't fully complete before this method is called, or this method may never return.
*/
void waitForStepDownAttempt_forTest();
@@ -615,13 +615,6 @@ private:
Status _checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions& writeConcern) const;
- /**
- * Wakes up threads in the process of handling a stepdown request based on whether the
- * TopologyCoordinator now believes enough secondaries are caught up for the stepdown request to
- * complete.
- */
- void _signalStepDownWaiterIfReady_inlock();
-
bool _canAcceptWritesFor_inlock(const NamespaceString& ns);
int _getMyId_inlock() const;
@@ -1170,9 +1163,6 @@ private:
// This member's index position in the current config.
int _selfIndex; // (M)
- // Condition to signal when new heartbeat data comes in.
- stdx::condition_variable _stepDownWaiters; // (M)
-
std::unique_ptr<VoteRequester> _voteRequester; // (M)
// Event that the election code will signal when the in-progress election completes.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 9ea9d13542f..d1269f78d1b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -212,9 +212,6 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
_updateLastCommittedOpTime_inlock();
}
- // Wake the stepdown waiter when our updated OpTime allows it to finish stepping down.
- _signalStepDownWaiterIfReady_inlock();
-
// Abort catchup if we have caught up to the latest known optime after heartbeat refreshing.
if (_catchupState) {
_catchupState->signalHeartbeatUpdate_inlock();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index a40e0c6a015..538f8c4693d 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -37,6 +37,7 @@
#include <vector>
#include "mongo/bson/util/bson_extract.h"
+#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/bson_extract_optime.h"
#include "mongo/db/repl/is_master_response.h"
@@ -1613,6 +1614,172 @@ TEST_F(ReplCoordTest, DrainCompletionMidStepDown) {
// ASSERT_EQUALS(2, getReplCoord()->getTerm()); // SERVER-28290
}
+TEST_F(StepDownTest, StepDownCanCompleteBasedOnReplSetUpdatePositionAlone) {
+ const auto repl = getReplCoord();
+
+ OpTimeWithTermOne opTime1(100, 1);
+ OpTimeWithTermOne opTime2(200, 1);
+
+ repl->setMyLastAppliedOpTime(opTime2);
+ repl->setMyLastDurableOpTime(opTime2);
+
+ // Secondaries not caught up yet.
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1));
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, opTime1));
+
+ simulateSuccessfulV1Election();
+ ASSERT_TRUE(repl->getMemberState().primary());
+
+ // Step down where the secondary actually has to catch up before the stepDown can succeed.
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
+
+ // The node has not been able to step down yet.
+ ASSERT_TRUE(repl->getMemberState().primary());
+
+ // Catch up one of the secondaries using only replSetUpdatePosition.
+ long long configVersion = repl->getConfig().getConfigVersion();
+ UpdatePositionArgs updatePositionArgs;
+
+ ASSERT_OK(updatePositionArgs.initialize(
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << configVersion
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 1
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << opTime2.asOpTime().toBSON())
+ << BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << configVersion
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 2
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << opTime1.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << opTime1.asOpTime().toBSON())))));
+
+ ASSERT_OK(repl->processReplSetUpdatePosition(updatePositionArgs, &configVersion));
+
+ // Verify that stepDown completes successfully.
+ ASSERT_OK(*result.second.get());
+ ASSERT_TRUE(repl->getMemberState().secondary());
+}
+
+class StepDownTestWithUnelectableNode : public StepDownTest {
+private:
+ void setUp() override {
+ ReplCoordTest::setUp();
+ init("mySet/test1:1234,test2:1234,test3:1234");
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "protocolVersion"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234")
+ << BSON("_id" << 1 << "host"
+ << "test2:1234"
+ << "priority"
+ << 0)
+ << BSON("_id" << 2 << "host"
+ << "test3:1234"))),
+ HostAndPort("test1", 1234));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ }
+};
+
+TEST_F(StepDownTestWithUnelectableNode,
+ UpdatePositionDuringStepDownWakesUpStepDownWaiterMoreThanOnce) {
+ const auto repl = getReplCoord();
+
+ OpTimeWithTermOne opTime1(100, 1);
+ OpTimeWithTermOne opTime2(200, 1);
+
+ repl->setMyLastAppliedOpTime(opTime2);
+ repl->setMyLastDurableOpTime(opTime2);
+
+ // No secondaries are caught up yet.
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1));
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, opTime1));
+
+ simulateSuccessfulV1Election();
+ ASSERT_TRUE(repl->getMemberState().primary());
+
+ // Step down where the secondary actually has to catch up before the stepDown can succeed.
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
+
+ // The node has not been able to step down yet.
+ ASSERT_TRUE(repl->getMemberState().primary());
+
+ // Use replSetUpdatePosition to catch up the first secondary, which is not electable.
+ // This will yield a majority at the primary's opTime, so the waiter will be woken up,
+ // but stepDown will not be able to complete.
+ long long configVersion = repl->getConfig().getConfigVersion();
+ UpdatePositionArgs catchupFirstSecondary;
+
+ ASSERT_OK(catchupFirstSecondary.initialize(
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << configVersion
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 1
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << opTime2.asOpTime().toBSON())
+ << BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << configVersion
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 2
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << opTime1.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << opTime1.asOpTime().toBSON())))));
+
+ ASSERT_OK(repl->processReplSetUpdatePosition(catchupFirstSecondary, &configVersion));
+
+ // The primary has still not been able to finish stepping down.
+ ASSERT_TRUE(repl->getMemberState().primary());
+
+ // Now catch up the other secondary. This will wake up the waiter again, but this time
+ // there is an electable node, so stepDown will complete.
+ UpdatePositionArgs catchupOtherSecondary;
+
+ ASSERT_OK(catchupOtherSecondary.initialize(
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << configVersion
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 1
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << opTime2.asOpTime().toBSON())
+ << BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << configVersion
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 2
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << opTime2.asOpTime().toBSON())))));
+
+ ASSERT_OK(repl->processReplSetUpdatePosition(catchupOtherSecondary, &configVersion));
+
+ // Verify that stepDown completes successfully.
+ ASSERT_OK(*result.second.get());
+ ASSERT_TRUE(repl->getMemberState().secondary());
+}
+
TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
const auto opCtx = makeOperationContext();
@@ -1631,23 +1798,32 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
TEST_F(StepDownTest,
NodeReturnsExceededTimeLimitWhenStepDownFailsToObtainTheGlobalLockWithinTheAllottedTime) {
OpTimeWithTermOne optime1(100, 1);
- // All nodes are caught up
+
+ // Set up this test so that all nodes are caught up. This is necessary to exclude the false
+ // positive case where stepDown returns "ExceededTimeLimit", but not because it could not
+ // acquire the lock, but because it could not satisfy all stepdown conditions on time.
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1));
simulateSuccessfulV1Election();
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
const auto opCtx = makeOperationContext();
- // Make sure stepDown cannot grab the global shared lock
+ // Make sure stepDown cannot grab the global exclusive lock. We need to use a different
+ // locker to test this, or otherwise stepDown will be granted the lock automatically.
Lock::GlobalWrite lk(opCtx.get());
+ ASSERT_TRUE(opCtx->lockState()->isW());
+ auto locker = opCtx.get()->swapLockState(stdx::make_unique<DefaultLockerImpl>());
Status status =
getReplCoord()->stepDown(opCtx.get(), false, Milliseconds(0), Milliseconds(1000));
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ opCtx.get()->swapLockState(std::move(locker));
}
/* Step Down Test for a 5-node replica set */
@@ -1988,10 +2164,6 @@ TEST_F(StepDownTest,
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
// Step down where the secondary actually has to catch up before the stepDown can succeed.
- // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
- // T + 2 seconds and send out a new round of heartbeats immediately.
- // This makes it unnecessary to advance the clock after entering the network to process
- // the heartbeat requests.
auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
catchUpSecondaries(optime2);
@@ -2015,15 +2187,12 @@ TEST_F(StepDownTest,
simulateSuccessfulV1Election();
// Step down where the secondary actually has to catch up before the stepDown can succeed.
- // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
- // T + 2 seconds and send out a new round of heartbeats immediately.
- // This makes it unnecessary to advance the clock after entering the network to process
- // the heartbeat requests.
auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
- // Secondary has not caught up on first round of heartbeats.
+ // Advance the clock by two seconds to allow for a round of heartbeats to be sent. The
+ // secondary will not appear to be caught up.
enterNetwork();
- getNet()->runUntil(getNet()->now() + Milliseconds(1000));
+ getNet()->runUntil(getNet()->now() + Milliseconds(2000));
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
log() << "HB1: " << request.target.toString() << " processing " << request.cmdObj;
@@ -2087,10 +2256,6 @@ TEST_F(StepDownTest, OnlyOneStepDownCmdIsAllowedAtATime) {
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
// Step down where the secondary actually has to catch up before the stepDown can succeed.
- // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
- // T + 2 seconds and send out a new round of heartbeats immediately.
- // This makes it unnecessary to advance the clock after entering the network to process
- // the heartbeat requests.
auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
// We should still be primary at this point
@@ -2126,12 +2291,6 @@ TEST_F(StepDownTest, UnconditionalStepDownFailsStepDownCommand) {
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- // Step down where the secondary actually has to catch up before the stepDown can succeed.
- // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
- // T + 2 seconds and send out a new round of heartbeats immediately.
- // This makes it unnecessary to advance the clock after entering the network to process
- // the heartbeat requests.
-
// Start a stepdown command that needs to wait for secondaries to catch up.
auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
@@ -2166,12 +2325,6 @@ TEST_F(StepDownTest, InterruptingStepDownCommandRestoresWriteAvailability) {
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- // Step down where the secondary actually has to catch up before the stepDown can succeed.
- // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
- // T + 2 seconds and send out a new round of heartbeats immediately.
- // This makes it unnecessary to advance the clock after entering the network to process
- // the heartbeat requests.
-
// Start a stepdown command that needs to wait for secondaries to catch up.
auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
@@ -2213,12 +2366,6 @@ TEST_F(StepDownTest, InterruptingAfterUnconditionalStepdownDoesNotRestoreWriteAv
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- // Step down where the secondary actually has to catch up before the stepDown can succeed.
- // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
- // T + 2 seconds and send out a new round of heartbeats immediately.
- // This makes it unnecessary to advance the clock after entering the network to process
- // the heartbeat requests.
-
// Start a stepdown command that needs to wait for secondaries to catch up.
auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 71ae073c80d..55f09997f7e 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -2334,7 +2334,9 @@ bool TopologyCoordinator::isSafeToStepDown() {
continue;
}
UnelectableReasonMask reason = _getUnelectableReason(memberIndex);
- if (!reason && _memberData.at(memberIndex).getHeartbeatAppliedOpTime() >= lastOpApplied) {
+ auto memberData = _memberData.at(memberIndex);
+ bool caughtUp = (memberData.getLastAppliedOpTime() >= lastOpApplied);
+ if (!reason && caughtUp) {
// Found a caught up and electable node, succeed with step down.
return true;
}