summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2017-09-13 17:16:21 -0400
committerSpencer T Brody <spencer@mongodb.com>2017-09-25 16:30:40 -0400
commit44d011a20109ef8769d881306e33bc92a2b44f4d (patch)
tree30f1bb0082cc8541f7155a72a588f454ac4ba55a /src
parent0939e1447aa5b4291c09c1fab2344a66098213dd (diff)
downloadmongo-44d011a20109ef8769d881306e33bc92a2b44f4d.tar.gz
SERVER-28544 Change stepdown command to take global lock in exclusive mode
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp203
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h26
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp12
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp145
-rw-r--r--src/mongo/db/repl/topology_coordinator.h27
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp36
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h10
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp328
8 files changed, 682 insertions, 105 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 00d63c29d23..e6cfcd140b2 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -568,10 +568,11 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
}
}
+ auto opCtx = cc().makeOperationContext();
stdx::unique_lock<stdx::mutex> lock(_mutex);
invariant(_rsConfigState == kConfigStartingUp);
const PostMemberStateUpdateAction action =
- _setCurrentRSConfig_inlock(localConfig, myIndex.getValue());
+ _setCurrentRSConfig_inlock(opCtx.get(), localConfig, myIndex.getValue());
if (!lastOpTime.isNull()) {
_setMyLastAppliedOpTime_inlock(lastOpTime, false);
_setMyLastDurableOpTime_inlock(lastOpTime, false);
@@ -591,7 +592,6 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
if (!isArbiter) {
_externalState->startThreads(_settings);
- auto opCtx = cc().makeOperationContext();
_startDataReplication(opCtx.get());
}
}
@@ -777,7 +777,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
_opTimeWaiterList.signalAndRemoveAll_inlock();
_currentCommittedSnapshotCond.notify_all();
_initialSyncer.swap(initialSyncerCopy);
- _signalStepDownWaiter_inlock();
+ _stepDownWaiters.notify_all();
}
@@ -872,7 +872,8 @@ Status ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState)
_topCoord->setFollowerMode(newState.s);
- const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock();
+ const PostMemberStateUpdateAction action =
+ _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
lk.unlock();
_performPostMemberStateUpdateAction(action);
@@ -959,8 +960,11 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
// our election in onTransitionToPrimary(), above.
_updateLastCommittedOpTime_inlock();
+ // Update _canAcceptNonLocalWrites
invariant(!_canAcceptNonLocalWrites);
- _canAcceptNonLocalWrites = true;
+ _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
+ invariant(_canAcceptNonLocalWrites);
+
log() << "transition to primary complete; database writes are now permitted" << rsLog;
_drainFinishedCond.notify_all();
_externalState->startNoopWriter(_getMyLastAppliedOpTime_inlock());
@@ -1628,74 +1632,130 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
if (!getMemberState().primary()) {
// Note this check is inherently racy - it's always possible for the node to
- // stepdown from some other path before we acquire the global shared lock, but
- // that's okay because we are resiliant to that happening in _stepDownContinue.
+ // stepdown from some other path before we acquire the global exclusive lock. This check
+ // is just to try to save us from acquiring the global X lock unnecessarily.
return {ErrorCodes::NotMaster, "not primary so can't step down"};
}
- Lock::GlobalLock globalReadLock(
- opCtx, MODE_S, durationCount<Milliseconds>(stepdownTime), Lock::GlobalLock::EnqueueOnly());
+ auto globalLock = stdx::make_unique<Lock::GlobalLock>(
+ opCtx, MODE_X, durationCount<Milliseconds>(stepdownTime), Lock::GlobalLock::EnqueueOnly());
- // We've requested the global shared lock which will stop new writes from coming in,
- // but existing writes could take a long time to finish, so kill all user operations
+ // We've requested the global exclusive lock which will stop new operations from coming in,
+ // but existing operations could take a long time to finish, so kill all user operations
// to help us get the global lock faster.
_externalState->killAllUserOperations(opCtx);
- globalReadLock.waitForLock(durationCount<Milliseconds>(stepdownTime));
-
- if (!globalReadLock.isLocked()) {
+ globalLock->waitForLock(durationCount<Milliseconds>(stepdownTime));
+ if (!globalLock->isLocked()) {
return {ErrorCodes::ExceededTimeLimit,
"Could not acquire the global shared lock within the amount of time "
"specified that we should step down for"};
}
- PostMemberStateUpdateAction action = kActionNone;
- try {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
- Status status = _topCoord->prepareForStepDownAttempt();
- if (!status.isOK()) {
- // This will cause us to fail if we're already in the process of stepping down.
- return status;
- }
- // If we return before this stepdown completes successfully, reset our state back as it was
- // before we attempted the stepdown.
- auto guard = MakeGuard([this] { _topCoord->abortAttemptedStepDownIfNeeded(); });
+ auto status = opCtx->checkForInterruptNoAssert();
+ if (!status.isOK()) {
+ return status;
+ }
- opCtx->checkForInterrupt();
- if (!_tryToStepDown(lk, waitUntil, stepDownUntil, force)) {
- // We send out a fresh round of heartbeats because stepping down successfully
- // without {force: true} is dependent on timely heartbeat data.
- _restartHeartbeats_inlock();
- do {
- opCtx->waitForConditionOrInterruptUntil(
- _stepDownWaiters, lk, std::min(stepDownUntil, waitUntil));
- } while (!_tryToStepDown(lk, waitUntil, stepDownUntil, force));
- }
- action = _updateMemberStateFromTopologyCoordinator_inlock();
- } catch (const DBException& ex) {
- return ex.toStatus();
+ const long long termAtStart = _topCoord->getTerm();
+
+ status = _topCoord->prepareForStepDownAttempt();
+ if (!status.isOK()) {
+ // This will cause us to fail if we're already in the process of stepping down.
+ return status;
}
- _performPostMemberStateUpdateAction(action);
+ // Update _canAcceptNonLocalWrites from the TopologyCoordinator now that we're in the middle
+ // of a stepdown attempt. This will prevent us from accepting writes so that if our stepdown
+ // attempt fails later we can release the global lock and go to sleep to allow secondaries to
+ // catch up without allowing new writes in.
+ auto action = _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
+ invariant(action == PostMemberStateUpdateAction::kActionNone);
+ invariant(!_canAcceptNonLocalWrites);
+
+ // Make sure that we leave _canAcceptNonLocalWrites in the proper state.
+ auto updateMemberState = [&] {
+ invariant(lk.owns_lock());
+ invariant(opCtx->lockState()->isW());
+ auto action = _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
+ lk.unlock();
+ _performPostMemberStateUpdateAction(action);
+ };
+ ScopeGuard onExitGuard = MakeGuard([&] {
+ _topCoord->abortAttemptedStepDownIfNeeded();
+ updateMemberState();
+ });
+
+ try {
+
+ bool firstTime = true;
+ while (!_topCoord->attemptStepDown(
+ termAtStart, _replExecutor->now(), waitUntil, stepDownUntil, force)) {
+
+ // The stepdown attempt failed.
+
+ if (firstTime) {
+ // We send out a fresh round of heartbeats because stepping down successfully
+ // without {force: true} is dependent on timely heartbeat data.
+ _restartHeartbeats_inlock();
+ firstTime = false;
+ }
+
+ // Now release the global lock to allow secondaries to read the oplog, then wait until
+ // enough secondaries are caught up for us to finish stepdown.
+ globalLock.reset();
+ invariant(!opCtx->lockState()->isLocked());
+
+ // Make sure we re-acquire the global lock before returning so that we're always holding
+ // the
+ // global lock when the onExitGuard set up earlier runs.
+ ON_BLOCK_EXIT([&] {
+ // Need to release _mutex before re-acquiring the global lock to preserve lock
+ // acquisition order rules.
+ lk.unlock();
+
+ // Need to re-acquire the global lock before re-attempting stepdown.
+ // We use no timeout here even though that means the lock acquisition could take
+ // longer
+ // than the stepdown window. If that happens, the call to _tryToStepDown
+ // immediately
+ // after will error. Since we'll need the global lock no matter what to clean up a
+ // failed stepdown attempt, we might as well spend whatever time we need to acquire
+ // it
+ // now.
+ globalLock.reset(new Lock::GlobalLock(opCtx, MODE_X, UINT_MAX));
+ invariant(globalLock->isLocked());
+ lk.lock();
+ });
+
+ // We ignore the case where waitForConditionOrInterruptUntil returns
+ // stdx::cv_status::timeout because in that case coming back around the loop and calling
+ // attemptStepDown again will cause attemptStepDown to return ExceededTimeLimit with
+ // the proper error message.
+ opCtx->waitForConditionOrInterruptUntil(
+ _stepDownWaiters, lk, std::min(stepDownUntil, waitUntil));
+ }
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+
+ // Stepdown success!
+ onExitGuard.Dismiss();
+ updateMemberState();
// Schedule work to (potentially) step back up once the stepdown period has ended.
_scheduleWorkAt(
stepDownUntil,
stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, this, stdx::placeholders::_1));
-
return Status::OK();
}
-void ReplicationCoordinatorImpl::_signalStepDownWaiter_inlock() {
- _stepDownWaiters.notify_all();
-}
-
-bool ReplicationCoordinatorImpl::_tryToStepDown(WithLock,
- const Date_t waitUntil,
- const Date_t stepDownUntil,
- const bool force) {
- return _topCoord->attemptStepDown(_replExecutor->now(), waitUntil, stepDownUntil, force);
+void ReplicationCoordinatorImpl::_signalStepDownWaiterIfReady_inlock() {
+ if (_topCoord->isSafeToStepDown()) {
+ _stepDownWaiters.notify_all();
+ }
}
void ReplicationCoordinatorImpl::_handleTimePassing(
@@ -2024,7 +2084,8 @@ Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
return Status(ErrorCodes::OperationFailed, "already out of maintenance mode");
}
- const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock();
+ const PostMemberStateUpdateAction action =
+ _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
lk.unlock();
_performPostMemberStateUpdateAction(action);
return Status::OK();
@@ -2258,7 +2319,8 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(
}
const ReplSetConfig oldConfig = _rsConfig;
- const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex);
+ const PostMemberStateUpdateAction action =
+ _setCurrentRSConfig_inlock(opCtx.get(), newConfig, myIndex);
// On a reconfig we drop all snapshots so we don't mistakenely read from the wrong one.
// For example, if we change the meaning of the "committed" snapshot from applied -> durable.
@@ -2348,7 +2410,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt
auto initialDataTS = SnapshotName(lastAppliedOpTime.getTimestamp().asULL());
_storage->setInitialDataTimestamp(getServiceContext(), initialDataTS);
- _finishReplSetInitiate(newConfig, myIndex.getValue());
+ _finishReplSetInitiate(opCtx, newConfig, myIndex.getValue());
// A configuration passed to replSetInitiate() with the current node as an arbiter
// will fail validation with a "replSet initiate got ... while validating" reason.
@@ -2360,12 +2422,13 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt
return Status::OK();
}
-void ReplicationCoordinatorImpl::_finishReplSetInitiate(const ReplSetConfig& newConfig,
+void ReplicationCoordinatorImpl::_finishReplSetInitiate(OperationContext* opCtx,
+ const ReplSetConfig& newConfig,
int myIndex) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_rsConfigState == kConfigInitiating);
invariant(!_rsConfig.isInitialized());
- auto action = _setCurrentRSConfig_inlock(newConfig, myIndex);
+ auto action = _setCurrentRSConfig_inlock(opCtx, newConfig, myIndex);
lk.unlock();
_performPostMemberStateUpdateAction(action);
}
@@ -2378,7 +2441,22 @@ void ReplicationCoordinatorImpl::_setConfigState_inlock(ConfigState newState) {
}
ReplicationCoordinatorImpl::PostMemberStateUpdateAction
-ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
+ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock(
+ OperationContext* opCtx) {
+ {
+ // We have to do this check even if our current and target state are the same as we might
+ // have just failed a stepdown attempt and thus are staying in PRIMARY state but restoring
+ // our ability to accept writes.
+ bool canAcceptWrites = _topCoord->canAcceptWrites();
+ if (canAcceptWrites != _canAcceptNonLocalWrites) {
+ // We must be holding the global X lock to change _canAcceptNonLocalWrites.
+ invariant(opCtx);
+ invariant(opCtx->lockState()->isW());
+ }
+ _canAcceptNonLocalWrites = canAcceptWrites;
+ }
+
+
const MemberState newState = _topCoord->getMemberState();
if (newState == _memberState) {
if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
@@ -2399,7 +2477,12 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
_replicationWaiterList.signalAndRemoveAll_inlock();
// Wake up the optime waiter that is waiting for primary catch-up to finish.
_opTimeWaiterList.signalAndRemoveAll_inlock();
- _canAcceptNonLocalWrites = false;
+ // If there are any pending stepdown command requests wake them up.
+ _stepDownWaiters.notify_all();
+
+ // _canAcceptNonLocalWrites should already be set above.
+ invariant(!_canAcceptNonLocalWrites);
+
serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false);
result = kActionCloseAllConnections;
} else {
@@ -2516,7 +2599,7 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp();
_topCoord->processWinElection(_electionId, ts);
const PostMemberStateUpdateAction nextAction =
- _updateMemberStateFromTopologyCoordinator_inlock();
+ _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
invariant(nextAction != kActionWinElection);
lk.unlock();
_performPostMemberStateUpdateAction(nextAction);
@@ -2682,7 +2765,8 @@ Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& a
}
ReplicationCoordinatorImpl::PostMemberStateUpdateAction
-ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newConfig,
+ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(OperationContext* opCtx,
+ const ReplSetConfig& newConfig,
int myIndex) {
invariant(_settings.usingReplSets());
_cancelHeartbeats_inlock();
@@ -2705,7 +2789,8 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
_cancelPriorityTakeover_inlock();
_cancelAndRescheduleElectionTimeout_inlock();
- const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock();
+ const PostMemberStateUpdateAction action =
+ _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
if (_selfIndex >= 0) {
// Don't send heartbeats if we're not in the config, if we get re-added one of the
// nodes in the set will contact us.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 12ab02dbff2..9eb62a9eea3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -595,7 +595,8 @@ private:
* Returns an action to be performed after unlocking _mutex, via
* _performPostMemberStateUpdateAction.
*/
- PostMemberStateUpdateAction _setCurrentRSConfig_inlock(const ReplSetConfig& newConfig,
+ PostMemberStateUpdateAction _setCurrentRSConfig_inlock(OperationContext* opCtx,
+ const ReplSetConfig& newConfig,
int myIndex);
/**
@@ -633,16 +634,11 @@ private:
Status _checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions& writeConcern) const;
/**
- * Triggers all callbacks that are blocked waiting for new heartbeat data
- * to decide whether or not to finish a step down.
+ * Wakes up threads in the process of handling a stepdown request based on whether the
+ * TopologyCoordinator now believes enough secondaries are caught up for the stepdown request to
+ * complete.
*/
- void _signalStepDownWaiter_inlock();
-
- /**
- * Non-blocking helper method for the stepDown method, that represents executing
- * one attempt to step down.
- */
- bool _tryToStepDown(WithLock, Date_t waitUntil, Date_t stepdownUntil, bool force);
+ void _signalStepDownWaiterIfReady_inlock();
bool _canAcceptWritesFor_inlock(const NamespaceString& ns);
@@ -782,7 +778,9 @@ private:
/**
* Finishes the work of processReplSetInitiate() in the event of a successful quorum check.
*/
- void _finishReplSetInitiate(const ReplSetConfig& newConfig, int myIndex);
+ void _finishReplSetInitiate(OperationContext* opCtx,
+ const ReplSetConfig& newConfig,
+ int myIndex);
/**
* Finishes the work of processReplSetReconfig, in the event of
@@ -806,8 +804,12 @@ private:
* Returns an enum indicating what action to take after releasing _mutex, if any.
* Call performPostMemberStateUpdateAction on the return value after releasing
* _mutex.
+ *
+ * Note: opCtx may be null as currently not all paths thread an OperationContext all the way
+ * down, but it must be non-null for any calls that change _canAcceptNonLocalWrites.
*/
- PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock();
+ PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock(
+ OperationContext* opCtx);
/**
* Performs a post member-state update action. Do not call while holding _mutex.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 66ffbb76192..1b02badec85 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -224,7 +224,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
}
// Wake the stepdown waiter when our updated OpTime allows it to finish stepping down.
- _signalStepDownWaiter_inlock();
+ _signalStepDownWaiterIfReady_inlock();
// Abort catchup if we have caught up to the latest known optime after heartbeat refreshing.
if (_catchupState) {
@@ -247,7 +247,7 @@ stdx::unique_lock<stdx::mutex> ReplicationCoordinatorImpl::_handleHeartbeatRespo
// Update the cached member state if different than the current topology member state
if (_memberState != _topCoord->getMemberState()) {
const PostMemberStateUpdateAction postUpdateAction =
- _updateMemberStateFromTopologyCoordinator_inlock();
+ _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
lock.unlock();
_performPostMemberStateUpdateAction(postUpdateAction);
lock.lock();
@@ -375,11 +375,10 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
auto opCtx = cc().makeOperationContext();
Lock::GlobalWrite globalExclusiveLock(opCtx.get());
- // TODO Add invariant that we've got global shared or global exclusive lock, when supported
- // by lock manager.
stdx::unique_lock<stdx::mutex> lk(_mutex);
+
_topCoord->finishUnconditionalStepDown();
- const auto action = _updateMemberStateFromTopologyCoordinator_inlock();
+ const auto action = _updateMemberStateFromTopologyCoordinator_inlock(opCtx.get());
lk.unlock();
_performPostMemberStateUpdateAction(action);
_replExecutor->signalEvent(finishedEvent);
@@ -581,7 +580,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
// If we do not have an index, we should pass -1 as our index to avoid falsely adding ourself to
// the data structures inside of the TopologyCoordinator.
const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1;
- const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndexValue);
+ const PostMemberStateUpdateAction action =
+ _setCurrentRSConfig_inlock(opCtx.get(), newConfig, myIndexValue);
lk.unlock();
_resetElectionInfoOnProtocolVersionUpgrade(opCtx.get(), oldConfig, newConfig);
_performPostMemberStateUpdateAction(action);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index b41f5d6b908..2f29f836a10 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -2040,6 +2040,9 @@ TEST_F(StepDownTest, OnlyOneStepDownCmdIsAllowedAtATime) {
// the heartbeat requests.
auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
+ // We should still be primary at this point
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
// Now while the first stepdown request is waiting for secondaries to catch up, attempt another
// stepdown request and ensure it fails.
const auto opCtx = makeOperationContext();
@@ -2053,6 +2056,148 @@ TEST_F(StepDownTest, OnlyOneStepDownCmdIsAllowedAtATime) {
ASSERT_TRUE(repl->getMemberState().secondary());
}
+// Test that if a stepdown command is blocked waiting for secondaries to catch up when an
+// unconditional stepdown happens, the stepdown command fails.
+TEST_F(StepDownTest, UnconditionalStepDownFailsStepDownCommand) {
+ OpTime optime1(Timestamp(100, 1), 1);
+ OpTime optime2(Timestamp(100, 2), 1);
+
+ // No secondary is caught up
+ auto repl = getReplCoord();
+ repl->setMyLastAppliedOpTime(optime2);
+ repl->setMyLastDurableOpTime(optime2);
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
+
+ simulateSuccessfulV1Election();
+
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // Step down where the secondary actually has to catch up before the stepDown can succeed.
+ // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
+ // T + 2 seconds and send out a new round of heartbeats immediately.
+ // This makes it unnecessary to advance the clock after entering the network to process
+ // the heartbeat requests.
+
+ // Start a stepdown command that needs to wait for secondaries to catch up.
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
+
+ // We should still be primary at this point
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // Now while the first stepdown request is waiting for secondaries to catch up, force an
+ // unconditional stepdown.
+ const auto opCtx = makeOperationContext();
+ ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(opCtx.get(), 2));
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ // Ensure that the stepdown command failed.
+ ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, *result.second.get());
+}
+
+// Test that if a stepdown command is blocked waiting for secondaries to catch up when an
+// unconditional stepdown happens, and then is interrupted, we stay stepped down, even though
+// normally if we were just interrupted we would step back up.
+TEST_F(StepDownTest, InterruptingStepDownCommandRestoresWriteAvailability) {
+ OpTime optime1(Timestamp(100, 1), 1);
+ OpTime optime2(Timestamp(100, 2), 1);
+
+ // No secondary is caught up
+ auto repl = getReplCoord();
+ repl->setMyLastAppliedOpTime(optime2);
+ repl->setMyLastDurableOpTime(optime2);
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
+
+ simulateSuccessfulV1Election();
+
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // Step down where the secondary actually has to catch up before the stepDown can succeed.
+ // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
+ // T + 2 seconds and send out a new round of heartbeats immediately.
+ // This makes it unnecessary to advance the clock after entering the network to process
+ // the heartbeat requests.
+
+ // Start a stepdown command that needs to wait for secondaries to catch up.
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
+
+ // We should still be primary at this point
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // Interrupt the ongoing stepdown command.
+ {
+ stdx::lock_guard<Client> lk(*result.first.client.get());
+ result.first.opCtx->markKilled(ErrorCodes::Interrupted);
+ }
+
+ // Ensure that the stepdown command failed.
+ ASSERT_EQUALS(*result.second.get(), ErrorCodes::Interrupted);
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // This is the important check, that we didn't accidentally step back up when aborting the
+ // stepdown command attempt.
+ const auto opCtx = makeOperationContext();
+ Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
+ ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "admin"));
+}
+
+// Test that if a stepdown command is blocked waiting for secondaries to catch up when an
+// unconditional stepdown happens, and then is interrupted, we stay stepped down, even though
+// normally if we were just interrupted we would step back up.
+TEST_F(StepDownTest, InterruptingAfterUnconditionalStepdownDoesNotRestoreWriteAvailability) {
+ OpTime optime1(Timestamp(100, 1), 1);
+ OpTime optime2(Timestamp(100, 2), 1);
+
+ // No secondary is caught up
+ auto repl = getReplCoord();
+ repl->setMyLastAppliedOpTime(optime2);
+ repl->setMyLastDurableOpTime(optime2);
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
+
+ simulateSuccessfulV1Election();
+
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // Step down where the secondary actually has to catch up before the stepDown can succeed.
+ // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
+ // T + 2 seconds and send out a new round of heartbeats immediately.
+ // This makes it unnecessary to advance the clock after entering the network to process
+ // the heartbeat requests.
+
+ // Start a stepdown command that needs to wait for secondaries to catch up.
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
+
+ // We should still be primary at this point
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // Interrupt the ongoing stepdown command.
+ {
+ stdx::lock_guard<Client> lk(*result.first.client.get());
+ result.first.opCtx->markKilled(ErrorCodes::Interrupted);
+ }
+
+ // Now while the first stepdown request is waiting for secondaries to catch up, force an
+ // unconditional stepdown.
+ const auto opCtx = makeOperationContext();
+ ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(opCtx.get(), 2));
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ // Ensure that the stepdown command failed.
+ auto stepDownStatus = *result.second.get();
+ ASSERT_NOT_OK(stepDownStatus);
+ // Which code is returned is racy.
+ ASSERT(stepDownStatus == ErrorCodes::PrimarySteppedDown ||
+ stepDownStatus == ErrorCodes::Interrupted);
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ // This is the important check, that we didn't accidentally step back up when aborting the
+ // stepdown command attempt.
+ Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
+ ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "admin"));
+}
+
TEST_F(ReplCoordTest, GetReplicationModeNone) {
init();
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index c17004fb61b..0716e04a546 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -119,6 +119,11 @@ public:
virtual MemberState getMemberState() const = 0;
/**
+ * Returns whether this node should be allowed to accept writes.
+ */
+ virtual bool canAcceptWrites() const = 0;
+
+ /**
* Returns true if this node is in the process of stepping down. Note that this can be
* due to an unconditional stepdown that must succeed (for instance from learning about a new
* term) or due to a stepdown attempt that could fail (for instance from a stepdown cmd that
@@ -575,17 +580,21 @@ public:
*
*
* If C1 is true, or if both C2 and C3 are true, then the stepdown occurs and this method
- * returns true. Otherwise this method returns false if the stepdown currently can't succeed
- * but waiting for more time to pass could make it succeed, or throws an exception if this
- * stepdown attempt should be aborted.
- *
- * NOTE: It is illegal to call this method if the node is not a primary.
+ * returns true. If the conditions for successful stepdown aren't met yet, but waiting for more
+ * time to pass could make it succeed, returns false. If the whole stepdown attempt should be
+ * abandoned (for example because the time limit expired or because we've already stepped down),
+ * throws an exception.
* TODO(spencer): Unify with the finishUnconditionalStepDown() method.
*/
- virtual bool attemptStepDown(Date_t now,
- Date_t waitUntil,
- Date_t stepDownUntil,
- bool force) = 0;
+ virtual bool attemptStepDown(
+ long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) = 0;
+
+ /**
+ * Returns whether it is safe for a stepdown attempt to complete, ignoring the 'force' argument.
+ * This is essentially checking conditions C2 and C3 as described in the comment to
+ * attemptStepDown().
+ */
+ virtual bool isSafeToStepDown() = 0;
/**
* Readies the TopologyCoordinator for stepdown. Returns false if we're already in the process
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 67231ed80ae..f6e1d24c5af 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2637,6 +2637,10 @@ MemberState TopologyCoordinatorImpl::getMemberState() const {
return _followerMode;
}
+bool TopologyCoordinatorImpl::canAcceptWrites() const {
+ return _leaderMode == LeaderMode::kMaster;
+}
+
void TopologyCoordinatorImpl::setElectionInfo(OID electionId, Timestamp electionOpTime) {
invariant(_role == Role::leader);
_electionTime = electionOpTime;
@@ -2672,20 +2676,15 @@ void TopologyCoordinatorImpl::processLoseElection() {
}
}
-bool TopologyCoordinatorImpl::attemptStepDown(Date_t now,
- Date_t waitUntil,
- Date_t stepDownUntil,
- bool force) {
- if (_role != Role::leader) {
- uasserted(ErrorCodes::NotMaster,
- "Already stepped down from primary while processing step down request");
- }
- if (_leaderMode == LeaderMode::kSteppingDown) {
+bool TopologyCoordinatorImpl::attemptStepDown(
+ long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) {
+
+ if (_role != Role::leader || _leaderMode == LeaderMode::kSteppingDown || _term != termAtStart) {
uasserted(ErrorCodes::PrimarySteppedDown,
"While waiting for secondaries to catch up before stepping down, "
"this node decided to step down for other reasons");
}
-
+ invariant(_leaderMode == LeaderMode::kAttemptingStepDown);
if (now >= stepDownUntil) {
uasserted(ErrorCodes::ExceededTimeLimit,
@@ -2693,7 +2692,7 @@ bool TopologyCoordinatorImpl::attemptStepDown(Date_t now,
"time we were supposed to step down until");
}
- if (!_canCompleteStepDownAttempt(now, waitUntil, stepDownUntil, force)) {
+ if (!_canCompleteStepDownAttempt(now, waitUntil, force)) {
// Stepdown attempt failed.
// Check waitUntil after at least one stepdown attempt, so that stepdown could succeed even
@@ -2705,6 +2704,8 @@ bool TopologyCoordinatorImpl::attemptStepDown(Date_t now,
<< "Please use the replSetStepDown command with the argument "
<< "{force: true} to force node to step down.");
}
+
+ // Stepdown attempt failed, but in a way that can be retried
return false;
}
@@ -2716,15 +2717,22 @@ bool TopologyCoordinatorImpl::attemptStepDown(Date_t now,
bool TopologyCoordinatorImpl::_canCompleteStepDownAttempt(Date_t now,
Date_t waitUntil,
- Date_t stepDownUntil,
bool force) {
const bool forceNow = force && (now >= waitUntil);
- OpTime lastApplied = getMyLastAppliedOpTime();
-
if (forceNow) {
return true;
}
+ return isSafeToStepDown();
+}
+
+bool TopologyCoordinatorImpl::isSafeToStepDown() {
+ if (!_rsConfig.isInitialized() || _selfIndex < 0) {
+ return false;
+ }
+
+ OpTime lastApplied = getMyLastAppliedOpTime();
+
auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName);
invariant(tagStatus.isOK());
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index aca7365f279..aa30c194b1a 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -144,6 +144,7 @@ public:
virtual Role getRole() const;
virtual MemberState getMemberState() const;
+ virtual bool canAcceptWrites() const override;
virtual bool isSteppingDown() const override;
virtual HostAndPort getSyncSourceAddress() const;
virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const;
@@ -216,10 +217,12 @@ public:
virtual void processLoseElection();
virtual Status checkShouldStandForElection(Date_t now) const;
virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message);
- virtual bool attemptStepDown(Date_t now,
+ virtual bool attemptStepDown(long long termAtStart,
+ Date_t now,
Date_t waitUntil,
Date_t stepDownUntil,
bool force) override;
+ virtual bool isSafeToStepDown() override;
virtual bool prepareForUnconditionalStepDown() override;
virtual Status prepareForStepDownAttempt() override;
virtual void abortAttemptedStepDownIfNeeded() override;
@@ -398,10 +401,7 @@ private:
* Returns whether a stepdown attempt should be allowed to proceed. See the comment for
* attemptStepDown() for more details on the rules of when stepdown attempts succeed or fail.
*/
- bool _canCompleteStepDownAttempt(Date_t now,
- Date_t waitUntil,
- Date_t stepDownUntil,
- bool force);
+ bool _canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force);
void _stepDownSelfAndReplaceWith(int newPrimary);
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index fb877f5295d..5eb29adfc98 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -117,6 +117,11 @@ protected:
getTopoCoord().completeTransitionToPrimary(firstOpTimeOfTerm);
}
+ void setMyOpTime(const OpTime& opTime) {
+ auto* myMemberData = getTopoCoord().getMyMemberData();
+ myMemberData->setLastAppliedOpTime(opTime, now());
+ }
+
void setSelfMemberState(const MemberState& newState) {
getTopoCoord().changeMemberState_forTest(newState);
}
@@ -4057,6 +4062,329 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverIfTermNumbersSayPrimaryCaughtUp
"primary, and therefore cannot call for catchup takeover");
}
+TEST_F(TopoCoordTest, StepDownAttemptFailsWhenNotPrimary) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+
+ ASSERT_THROWS_CODE(getTopoCoord().attemptStepDown(term, curTime, futureTime, futureTime, false),
+ DBException,
+ ErrorCodes::PrimarySteppedDown);
+}
+
+TEST_F(TopoCoordTest, StepDownAttemptFailsWhenAlreadySteppingDown) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ makeSelfPrimary();
+ getTopoCoord().prepareForUnconditionalStepDown();
+
+ ASSERT_THROWS_CODE(getTopoCoord().attemptStepDown(term, curTime, futureTime, futureTime, false),
+ DBException,
+ ErrorCodes::PrimarySteppedDown);
+}
+
+TEST_F(TopoCoordTest, StepDownAttemptFailsForDifferentTerm) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ makeSelfPrimary();
+ ASSERT_OK(getTopoCoord().prepareForStepDownAttempt());
+
+ ASSERT_THROWS_CODE(
+ getTopoCoord().attemptStepDown(term - 1, curTime, futureTime, futureTime, false),
+ DBException,
+ ErrorCodes::PrimarySteppedDown);
+}
+
+TEST_F(TopoCoordTest, StepDownAttemptFailsIfPastStepDownUntil) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ makeSelfPrimary();
+ ASSERT_OK(getTopoCoord().prepareForStepDownAttempt());
+
+ ASSERT_THROWS_CODE_AND_WHAT(
+ getTopoCoord().attemptStepDown(term, curTime, futureTime, curTime, false),
+ DBException,
+ ErrorCodes::ExceededTimeLimit,
+ "By the time we were ready to step down, we were already past the time we were supposed to "
+ "step down until");
+}
+
+TEST_F(TopoCoordTest, StepDownAttemptFailsIfPastWaitUntil) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ makeSelfPrimary();
+ ASSERT_OK(getTopoCoord().prepareForStepDownAttempt());
+
+ std::string expectedWhat = str::stream()
+ << "No electable secondaries caught up as of " << dateToISOStringLocal(curTime)
+ << "Please use the replSetStepDown command with the argument "
+ << "{force: true} to force node to step down.";
+ ASSERT_THROWS_CODE_AND_WHAT(
+ getTopoCoord().attemptStepDown(term, curTime, curTime, futureTime, false),
+ DBException,
+ ErrorCodes::ExceededTimeLimit,
+ expectedWhat);
+}
+
+TEST_F(TopoCoordTest, StepDownAttemptFailsIfNoSecondariesCaughtUp) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ makeSelfPrimary();
+ setMyOpTime(OpTime(Timestamp(5, 0), term));
+ ASSERT_OK(getTopoCoord().prepareForStepDownAttempt());
+
+ heartbeatFromMember(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(4, 0), term));
+ heartbeatFromMember(
+ HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(4, 0), term));
+
+ ASSERT_FALSE(getTopoCoord().attemptStepDown(term, curTime, futureTime, futureTime, false));
+}
+
+TEST_F(TopoCoordTest, StepDownAttemptFailsIfNoSecondariesCaughtUpForceIsTrueButNotPastWaitUntil) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ makeSelfPrimary();
+ setMyOpTime(OpTime(Timestamp(5, 0), term));
+ ASSERT_OK(getTopoCoord().prepareForStepDownAttempt());
+
+ heartbeatFromMember(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(4, 0), term));
+ heartbeatFromMember(
+ HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(4, 0), term));
+
+ ASSERT_FALSE(getTopoCoord().attemptStepDown(term, curTime, futureTime, futureTime, true));
+}
+
+TEST_F(TopoCoordTest, StepDownAttemptSucceedsIfNoSecondariesCaughtUpForceIsTrueAndPastWaitUntil) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ makeSelfPrimary();
+ setMyOpTime(OpTime(Timestamp(5, 0), term));
+ ASSERT_OK(getTopoCoord().prepareForStepDownAttempt());
+
+ heartbeatFromMember(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(4, 0), term));
+ heartbeatFromMember(
+ HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(4, 0), term));
+
+ ASSERT_TRUE(getTopoCoord().attemptStepDown(term, curTime, curTime, futureTime, true));
+}
+
+TEST_F(TopoCoordTest, StepDownAttemptSucceedsIfSecondariesCaughtUp) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ makeSelfPrimary();
+ setMyOpTime(OpTime(Timestamp(5, 0), term));
+ ASSERT_OK(getTopoCoord().prepareForStepDownAttempt());
+
+ heartbeatFromMember(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(5, 0), term));
+ heartbeatFromMember(
+ HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(4, 0), term));
+
+ ASSERT_TRUE(getTopoCoord().attemptStepDown(term, curTime, futureTime, futureTime, false));
+}
+
+TEST_F(TopoCoordTest, StepDownAttemptFailsIfSecondaryCaughtUpButNotElectable) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017"
+ << "priority"
+ << 0
+ << "hidden"
+ << true)
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+ const auto term = getTopoCoord().getTerm();
+ Date_t curTime = now();
+ Date_t futureTime = curTime + Seconds(1);
+
+ makeSelfPrimary();
+ setMyOpTime(OpTime(Timestamp(5, 0), term));
+ ASSERT_OK(getTopoCoord().prepareForStepDownAttempt());
+
+ heartbeatFromMember(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(5, 0), term));
+ heartbeatFromMember(
+ HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(4, 0), term));
+
+ ASSERT_FALSE(getTopoCoord().attemptStepDown(term, curTime, futureTime, futureTime, false));
+}
+
TEST_F(HeartbeatResponseTestV1,
ScheduleACatchupTakeoverWhenElectableAndReceiveHeartbeatFromPrimaryInCatchup) {
updateConfig(BSON("_id"