summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSamy Lanka <samy.lanka@10gen.com>2017-06-19 15:43:11 -0400
committerSamy Lanka <samy.lanka@10gen.com>2017-06-30 16:23:39 -0400
commit453028f378f8344197cea0e3ded53f445ccd5abf (patch)
tree8aca7d4ab9c7f34f135cc3659a6c828336f214fc /src
parentc63465a42ed89ee6563841d7b349fa85de69963e (diff)
downloadmongo-453028f378f8344197cea0e3ded53f445ccd5abf.tar.gz
SERVER-29499 Schedule a catchup takeover on receiving heartbeats from the current primary
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/heartbeat_response_action.cpp6
-rw-r--r--src/mongo/db/repl/heartbeat_response_action.h9
-rw-r--r--src/mongo/db/repl/repl_set_config.cpp5
-rw-r--r--src/mongo/db/repl/repl_set_config.h7
-rw-r--r--src/mongo/db/repl/repl_set_config_test.cpp25
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp20
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h26
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp407
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp29
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp28
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp70
11 files changed, 606 insertions, 26 deletions
diff --git a/src/mongo/db/repl/heartbeat_response_action.cpp b/src/mongo/db/repl/heartbeat_response_action.cpp
index 7e787d42c96..014e021a4eb 100644
--- a/src/mongo/db/repl/heartbeat_response_action.cpp
+++ b/src/mongo/db/repl/heartbeat_response_action.cpp
@@ -49,6 +49,12 @@ HeartbeatResponseAction HeartbeatResponseAction::makePriorityTakeoverAction() {
return result;
}
+HeartbeatResponseAction HeartbeatResponseAction::makeCatchupTakeoverAction() {
+ HeartbeatResponseAction result;
+ result._action = CatchupTakeover;
+ return result;
+}
+
HeartbeatResponseAction HeartbeatResponseAction::makeElectAction() {
HeartbeatResponseAction result;
result._action = StartElection;
diff --git a/src/mongo/db/repl/heartbeat_response_action.h b/src/mongo/db/repl/heartbeat_response_action.h
index bb009600e5a..191448dd9bc 100644
--- a/src/mongo/db/repl/heartbeat_response_action.h
+++ b/src/mongo/db/repl/heartbeat_response_action.h
@@ -50,7 +50,8 @@ public:
StartElection,
StepDownSelf,
StepDownRemotePrimary,
- PriorityTakeover
+ PriorityTakeover,
+ CatchupTakeover
};
/**
@@ -75,6 +76,12 @@ public:
static HeartbeatResponseAction makePriorityTakeoverAction();
/**
+ * Makes a new action telling the current node to schedule an event to attempt to elect itself
+ * primary after the appropriate catchup takeover delay.
+ */
+ static HeartbeatResponseAction makeCatchupTakeoverAction();
+
+ /**
* Makes a new action telling the current node to step down as primary.
*
* It is an error to call this with primaryIndex != the index of the current node.
diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp
index 0cd9945b7dc..cace43edfcd 100644
--- a/src/mongo/db/repl/repl_set_config.cpp
+++ b/src/mongo/db/repl/repl_set_config.cpp
@@ -54,6 +54,7 @@ const Seconds ReplSetConfig::kDefaultHeartbeatTimeoutPeriod(10);
const Milliseconds ReplSetConfig::kDefaultElectionTimeoutPeriod(10000);
const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(kInfiniteCatchUpTimeout);
const bool ReplSetConfig::kDefaultChainingAllowed(true);
+const Milliseconds ReplSetConfig::kDefaultCatchupTakeoverDelay(30000);
namespace {
@@ -830,6 +831,10 @@ Milliseconds ReplSetConfig::getPriorityTakeoverDelay(int memberIdx) const {
return (priorityRank + 1) * getElectionTimeoutPeriod();
}
+Milliseconds ReplSetConfig::getCatchupTakeoverDelay() const {
+ return kDefaultCatchupTakeoverDelay;
+}
+
int ReplSetConfig::_calculatePriorityRank(double priority) const {
int count = 0;
for (MemberIterator mem = membersBegin(); mem != membersEnd(); mem++) {
diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h
index 3f226ae9295..0b8b868d747 100644
--- a/src/mongo/db/repl/repl_set_config.h
+++ b/src/mongo/db/repl/repl_set_config.h
@@ -66,6 +66,7 @@ public:
static const Seconds kDefaultHeartbeatTimeoutPeriod;
static const Milliseconds kDefaultCatchUpTimeoutPeriod;
static const bool kDefaultChainingAllowed;
+ static const Milliseconds kDefaultCatchupTakeoverDelay;
/**
* Initializes this ReplSetConfig from the contents of "cfg".
@@ -340,6 +341,12 @@ public:
*/
Milliseconds getPriorityTakeoverDelay(int memberIdx) const;
+ /**
+ * Returns the duration to wait before running for election when this node
+ * sees that it is more caught up than the current primary.
+ */
+ Milliseconds getCatchupTakeoverDelay() const;
+
private:
/**
* Parses the "settings" subdocument of a replica set configuration.
diff --git a/src/mongo/db/repl/repl_set_config_test.cpp b/src/mongo/db/repl/repl_set_config_test.cpp
index 8f582195b51..f9d0ec01f9d 100644
--- a/src/mongo/db/repl/repl_set_config_test.cpp
+++ b/src/mongo/db/repl/repl_set_config_test.cpp
@@ -1632,6 +1632,31 @@ TEST(ReplSetConfig, GetPriorityTakeoverDelay) {
ASSERT_EQUALS(Milliseconds(1000), configB.getPriorityTakeoverDelay(4));
}
+TEST(ReplSetConfig, GetCatchupTakeoverDelay) {
+ ReplSetConfig configA;
+ ASSERT_OK(configA.initialize(BSON("_id"
+ << "rs0"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "localhost:12345"
+ << "priority"
+ << 1)
+ << BSON("_id" << 1 << "host"
+ << "localhost:54321"
+ << "priority"
+ << 2)
+ << BSON("_id" << 2 << "host"
+ << "localhost:5321"
+ << "priority"
+ << 3))
+ << "settings"
+ << BSON("electionTimeoutMillis" << 1000))));
+ ASSERT_OK(configA.validate());
+ ASSERT_EQUALS(Milliseconds(30000), configA.getCatchupTakeoverDelay());
+}
+
TEST(ReplSetConfig, ConfirmDefaultValuesOfAndAbilityToSetWriteConcernMajorityJournalDefault) {
// PV0, should default to false.
ReplSetConfig config;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index bb2c96e7d20..045deec2bae 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -362,6 +362,19 @@ boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest(
return _priorityTakeoverWhen;
}
+boost::optional<Date_t> ReplicationCoordinatorImpl::getCatchupTakeover_forTest() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!_catchupTakeoverCbh.isValid()) {
+ return boost::none;
+ }
+ return _catchupTakeoverWhen;
+}
+
+executor::TaskExecutor::CallbackHandle ReplicationCoordinatorImpl::getCatchupTakeoverCbh_forTest()
+ const {
+ return _catchupTakeoverCbh;
+}
+
OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _getCurrentCommittedSnapshotOpTime_inlock();
@@ -2418,6 +2431,12 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
invariant(_uncommittedSnapshots.empty());
}
+ // If we are transitioning from secondary, cancel any scheduled takeovers.
+ if (_memberState.secondary()) {
+ _cancelCatchupTakeover_inlock();
+ _cancelPriorityTakeover_inlock();
+ }
+
_memberState = newState;
log() << "transition to " << newState.toString() << rsLog;
@@ -2641,6 +2660,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
log() << "This node is not a member of the config";
}
+ _cancelCatchupTakeover_inlock();
_cancelPriorityTakeover_inlock();
_cancelAndRescheduleElectionTimeout_inlock();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 8ec05babf24..1fbc2d059fc 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -361,6 +361,17 @@ public:
boost::optional<Date_t> getPriorityTakeover_forTest() const;
/**
+ * Returns the scheduled time of the catchup takeover callback. If a catchup
+ * takeover has not been scheduled, returns boost::none.
+ */
+ boost::optional<Date_t> getCatchupTakeover_forTest() const;
+
+ /**
+ * Returns the catchup takeover CallbackHandle.
+ */
+ executor::TaskExecutor::CallbackHandle getCatchupTakeoverCbh_forTest() const;
+
+ /**
* Simple wrappers around _setLastOptime_inlock to make it easier to test.
*/
Status setLastAppliedOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
@@ -1033,6 +1044,11 @@ private:
void _cancelPriorityTakeover_inlock();
/**
+ * Cancels all outstanding _catchupTakeover callbacks.
+ */
+ void _cancelCatchupTakeover_inlock();
+
+ /**
* Cancels the current _handleElectionTimeout callback and reschedules a new callback.
* Returns immediately otherwise.
*/
@@ -1282,7 +1298,7 @@ private:
// Used for testing only.
Date_t _handleElectionTimeoutWhen; // (M)
- // Callback Handle used to cancel a scheduled PriorityTakover callback.
+ // Callback Handle used to cancel a scheduled PriorityTakeover callback.
executor::TaskExecutor::CallbackHandle _priorityTakeoverCbh; // (M)
// Priority takeover callback will not run before this time.
@@ -1290,6 +1306,14 @@ private:
// Used for testing only.
Date_t _priorityTakeoverWhen; // (M)
+ // Callback Handle used to cancel a scheduled CatchupTakeover callback.
+ executor::TaskExecutor::CallbackHandle _catchupTakeoverCbh; // (M)
+
+ // Catchup takeover callback will not run before this time.
+ // If this date is Date_t(), the callback is either unscheduled or canceled.
+ // Used for testing only.
+ Date_t _catchupTakeoverWhen; // (M)
+
// Callback handle used by _waitForStartUpComplete() to block until configuration
// is loaded and external state threads have been started (unless this node is an arbiter).
CallbackHandle _finishLoadLocalConfigCbh; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 3bbb4b783d7..6248e4e6423 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -791,16 +791,16 @@ TEST_F(ReplCoordTest, ElectionFailsWhenTermChangesDuringActualElection) {
countLogLinesContaining("not becoming primary, we have been superceded already"));
}
-class PriorityTakeoverTest : public ReplCoordTest {
+class TakeoverTest : public ReplCoordTest {
public:
/*
* Verify that a given priority takeover delay is valid. Takeover delays are
* verified in terms of bounds since the delay value is randomized.
*/
- void assertValidTakeoverDelay(ReplSetConfig config,
- Date_t now,
- Date_t priorityTakeoverTime,
- int nodeIndex) {
+ void assertValidPriorityTakeoverDelay(ReplSetConfig config,
+ Date_t now,
+ Date_t priorityTakeoverTime,
+ int nodeIndex) {
Milliseconds priorityTakeoverDelay = priorityTakeoverTime - now;
Milliseconds electionTimeout = config.getElectionTimeoutPeriod();
@@ -922,7 +922,384 @@ private:
}
};
-TEST_F(PriorityTakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityThanCurrentPrimary) {
+TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfNodeIsFresherThanCurrentPrimary) {
+ BSONObj configObj = BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))
+ << "protocolVersion"
+ << 1);
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ ReplSetConfig config = assertMakeRSConfig(configObj);
+
+ auto replCoord = getReplCoord();
+ auto now = getNet()->now();
+
+ OperationContextNoop opCtx;
+ OpTime currentOptime(Timestamp(200, 1), 0);
+ replCoord->setMyLastAppliedOpTime(currentOptime);
+ replCoord->setMyLastDurableOpTime(currentOptime);
+ OpTime behindOptime(Timestamp(100, 1), 0);
+
+ // Make sure we're secondary and that no catchup takeover has been scheduled yet.
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
+
+ // Mock a first round of heartbeat responses, which should give us enough information to know
+ // that we are fresher than the current primary, prompting the scheduling of a catchup
+ // takeover.
+ now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
+
+ // Make sure that the catchup takeover has actually been scheduled and at the
+ // correct time.
+ ASSERT(replCoord->getCatchupTakeover_forTest());
+ auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
+ Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
+ ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
+}
+
+TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfBothTakeoversAnOption) {
+ BSONObj configObj = BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345"
+ << "priority"
+ << 2)
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))
+ << "protocolVersion"
+ << 1);
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ ReplSetConfig config = assertMakeRSConfig(configObj);
+
+ auto replCoord = getReplCoord();
+ auto now = getNet()->now();
+
+ OperationContextNoop opCtx;
+ OpTime currentOptime(Timestamp(200, 1), 0);
+ replCoord->setMyLastAppliedOpTime(currentOptime);
+ replCoord->setMyLastDurableOpTime(currentOptime);
+ OpTime behindOptime(Timestamp(100, 1), 0);
+
+ // Make sure we're secondary and that no catchup takeover has been scheduled.
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
+
+ // Mock a first round of heartbeat responses, which should give us enough information to know
+ // that we are fresher than the current primary, prompting the scheduling of a catchup
+ // takeover.
+ now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
+
+ // Make sure that the catchup takeover has actually been scheduled at the
+ // correct time and that a priority takeover has not been scheduled.
+ ASSERT(replCoord->getCatchupTakeover_forTest());
+ ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
+ auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
+ Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
+ ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
+}
+
+TEST_F(TakeoverTest, CatchupTakeoverNotScheduledTwice) {
+ BSONObj configObj = BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))
+ << "protocolVersion"
+ << 1);
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ ReplSetConfig config = assertMakeRSConfig(configObj);
+
+ auto replCoord = getReplCoord();
+ auto now = getNet()->now();
+
+ OperationContextNoop opCtx;
+ OpTime currentOptime(Timestamp(200, 1), 0);
+ replCoord->setMyLastAppliedOpTime(currentOptime);
+ replCoord->setMyLastDurableOpTime(currentOptime);
+ OpTime behindOptime(Timestamp(100, 1), 0);
+
+ // Make sure we're secondary and that no catchup takeover has been scheduled.
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
+
+ // Mock a first round of heartbeat responses, which should give us enough information to know
+ // that we are fresher than the current primary, prompting the scheduling of a catchup
+ // takeover.
+ now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
+
+ // Make sure that the catchup takeover has actually been scheduled and at the
+ // correct time.
+ ASSERT(replCoord->getCatchupTakeover_forTest());
+ executor::TaskExecutor::CallbackHandle catchupTakeoverCbh =
+ replCoord->getCatchupTakeoverCbh_forTest();
+ auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
+ Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
+ ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
+
+ // Mock another round of heartbeat responses
+ now = respondToHeartbeatsUntil(
+ config, now + config.getHeartbeatInterval(), HostAndPort("node2", 12345), behindOptime);
+
+ // Make sure another catchup takeover wasn't scheduled
+ ASSERT_EQUALS(catchupTakeoverTime, replCoord->getCatchupTakeover_forTest().get());
+ ASSERT_TRUE(catchupTakeoverCbh == replCoord->getCatchupTakeoverCbh_forTest());
+}
+
+TEST_F(TakeoverTest, CatchupAndPriorityTakeoverNotScheduledAtSameTime) {
+ BSONObj configObj = BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345"
+ << "priority"
+ << 2)
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))
+ << "protocolVersion"
+ << 1);
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ ReplSetConfig config = assertMakeRSConfig(configObj);
+
+ auto replCoord = getReplCoord();
+ auto now = getNet()->now();
+
+ OperationContextNoop opCtx;
+ OpTime currentOptime(Timestamp(200, 1), 0);
+ replCoord->setMyLastAppliedOpTime(currentOptime);
+ replCoord->setMyLastDurableOpTime(currentOptime);
+ OpTime behindOptime(Timestamp(100, 1), 0);
+
+ // Make sure we're secondary and that no catchup takeover has been scheduled.
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
+
+ // Mock a first round of heartbeat responses, which should give us enough information to know
+ // that we are fresher than the current primary, prompting the scheduling of a catchup
+ // takeover.
+ now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
+
+ // Make sure that the catchup takeover has actually been scheduled and at the
+ // correct time.
+ ASSERT(replCoord->getCatchupTakeover_forTest());
+ auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
+ Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
+ ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
+
+ // Mock another heartbeat where the primary is now up to date
+ now = respondToHeartbeatsUntil(
+ config, now + catchupTakeoverDelay / 2, HostAndPort("node2", 12345), currentOptime);
+
+ // Since we are no longer ahead of the primary, we can't schedule a catchup
+ // takeover anymore. But we are still higher priority than the primary, so
+ // after the heartbeat we will try to schedule a priority takeover.
+ // Because we can't schedule two takeovers at the same time and the
+ // catchup takeover hasn't fired yet, make sure that we don't schedule a
+ // priority takeover.
+ ASSERT(replCoord->getCatchupTakeover_forTest());
+ ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
+}
+
+TEST_F(TakeoverTest, CatchupTakeoverCallbackCanceledIfElectionTimeoutRuns) {
+ BSONObj configObj = BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))
+ << "protocolVersion"
+ << 1);
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ ReplSetConfig config = assertMakeRSConfig(configObj);
+
+ auto replCoord = getReplCoord();
+ auto now = getNet()->now();
+
+ OperationContextNoop opCtx;
+ OpTime currentOptime(Timestamp(200, 1), 0);
+ replCoord->setMyLastAppliedOpTime(currentOptime);
+ replCoord->setMyLastDurableOpTime(currentOptime);
+ OpTime behindOptime(Timestamp(100, 1), 0);
+
+ // Make sure we're secondary and that no catchup takeover has been scheduled.
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
+
+ startCapturingLogMessages();
+
+ // Mock a first round of heartbeat responses, which should give us enough information to know
+ // that we are fresher than the current primary, prompting the scheduling of a catchup
+ // takeover.
+ now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
+
+ // Make sure that the catchup takeover has actually been scheduled and at the
+ // correct time.
+ ASSERT(replCoord->getCatchupTakeover_forTest());
+ auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
+ Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
+ ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
+
+ // Fast forward clock to after electionTimeout and black hole all
+ // heartbeat requests to make sure the election timeout runs.
+ Date_t electionTimeout = replCoord->getElectionTimeout_forTest();
+ auto net = getNet();
+ net->enterNetwork();
+ while (net->now() < electionTimeout) {
+ net->runUntil(electionTimeout);
+ while (net->hasReadyRequests()) {
+ auto noi = net->getNextReadyRequest();
+ net->blackHole(noi);
+ }
+ }
+ ASSERT_EQUALS(electionTimeout, net->now());
+ net->exitNetwork();
+
+ stopCapturingLogMessages();
+
+ ASSERT_EQUALS(1, countLogLinesContaining("Starting an election, since we've seen no PRIMARY"));
+
+ // Make sure catchup takeover never happend and CatchupTakeover callback was canceled.
+ ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
+ ASSERT(replCoord->getMemberState().secondary());
+ ASSERT_EQUALS(1, countLogLinesContaining("Canceling catchup takeover callback"));
+ ASSERT_EQUALS(0, countLogLinesContaining("Starting an election for a catchup takeover"));
+}
+
+TEST_F(TakeoverTest, CatchupTakeoverCanceledIfTransitionToRollback) {
+ BSONObj configObj = BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))
+ << "protocolVersion"
+ << 1);
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ ReplSetConfig config = assertMakeRSConfig(configObj);
+
+ auto replCoord = getReplCoord();
+ auto now = getNet()->now();
+
+ OperationContextNoop opCtx;
+ OpTime currentOptime(Timestamp(200, 1), 0);
+ replCoord->setMyLastAppliedOpTime(currentOptime);
+ replCoord->setMyLastDurableOpTime(currentOptime);
+ OpTime behindOptime(Timestamp(100, 1), 0);
+
+ // Make sure we're secondary and that no catchup takeover has been scheduled.
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
+
+ startCapturingLogMessages();
+
+ // Mock a first round of heartbeat responses, which should give us enough information to know
+ // that we are fresher than the current primary, prompting the scheduling of a catchup
+ // takeover.
+ now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
+
+ // Make sure that the catchup takeover has actually been scheduled and at the
+ // correct time.
+ ASSERT(replCoord->getCatchupTakeover_forTest());
+ auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
+ Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
+ ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
+
+ // Transitioning to rollback state should cancel the takeover
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_ROLLBACK));
+ ASSERT_TRUE(replCoord->getMemberState().rollback());
+
+ stopCapturingLogMessages();
+
+ // Make sure catchup takeover never happend and CatchupTakeover callback was canceled.
+ ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
+ ASSERT_EQUALS(1, countLogLinesContaining("Canceling catchup takeover callback"));
+ ASSERT_EQUALS(0, countLogLinesContaining("Starting an election for a catchup takeover"));
+}
+
+TEST_F(TakeoverTest, CatchupTakeoverElectionIsANoop) {
+ BSONObj configObj = BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))
+ << "protocolVersion"
+ << 1);
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ ReplSetConfig config = assertMakeRSConfig(configObj);
+ HostAndPort primaryHostAndPort("node2", 12345);
+
+ auto replCoord = getReplCoord();
+ auto now = getNet()->now();
+
+ OperationContextNoop opCtx;
+ OpTime currentOptime(Timestamp(100, 5000), 0);
+ OpTime behindOptime(Timestamp(100, 4000), 0);
+
+ replCoord->setMyLastAppliedOpTime(currentOptime);
+ replCoord->setMyLastDurableOpTime(currentOptime);
+
+ // Make sure we're secondary and that no takeover has been scheduled.
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
+
+ // Mock a first round of heartbeat responses.
+ now = respondToHeartbeatsUntil(config, now, primaryHostAndPort, behindOptime);
+
+ // Make sure that the catchup takeover has actually been scheduled and at the
+ // correct time.
+ ASSERT(replCoord->getCatchupTakeover_forTest());
+ auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
+ Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
+ ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
+
+ startCapturingLogMessages();
+ now = respondToHeartbeatsUntil(config, catchupTakeoverTime, primaryHostAndPort, behindOptime);
+ stopCapturingLogMessages();
+
+ // Make sure that the catchup takeover fired as a NOOP.
+ ASSERT(replCoord->getMemberState().secondary());
+ ASSERT_EQUALS(1, countLogLinesContaining("Starting an election for a catchup takeover [NOOP]"));
+}
+
+TEST_F(TakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityThanCurrentPrimary) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
@@ -962,14 +1339,14 @@ TEST_F(PriorityTakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityTha
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
- assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0);
+ assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// Also make sure that updating the term cancels the scheduled priority takeover.
ASSERT_EQUALS(ErrorCodes::StaleTerm, replCoord->updateTerm(&opCtx, replCoord->getTerm() + 1));
ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
}
-TEST_F(PriorityTakeoverTest, SuccessfulPriorityTakeover) {
+TEST_F(TakeoverTest, SuccessfulPriorityTakeover) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
@@ -1009,7 +1386,7 @@ TEST_F(PriorityTakeoverTest, SuccessfulPriorityTakeover) {
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
- assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0);
+ assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// The priority takeover might be scheduled at a time later than one election
// timeout after our initial heartbeat responses, so mock another round of
@@ -1021,7 +1398,7 @@ TEST_F(PriorityTakeoverTest, SuccessfulPriorityTakeover) {
performSuccessfulPriorityTakeover(priorityTakeoverTime);
}
-TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) {
+TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
@@ -1064,7 +1441,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) {
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
- assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0);
+ assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// At this point the other nodes are all ahead of the current node, so it can't call for
// priority takeover.
@@ -1088,7 +1465,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) {
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
- assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0);
+ assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// Now make us caught up enough to call for priority takeover to succeed.
replCoord->setMyLastAppliedOpTime(closeEnoughOpTime);
@@ -1097,7 +1474,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) {
performSuccessfulPriorityTakeover(priorityTakeoverTime);
}
-TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecond) {
+TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecond) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
@@ -1139,7 +1516,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecon
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
- assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0);
+ assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// At this point the other nodes are all ahead of the current node, so it can't call for
// priority takeover.
@@ -1163,7 +1540,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecon
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
- assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0);
+ assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// Now make us caught up enough to call for priority takeover to succeed.
replCoord->setMyLastAppliedOpTime(closeEnoughOpTime);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 379a18fb231..0298002326c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -274,8 +274,8 @@ stdx::unique_lock<stdx::mutex> ReplicationCoordinatorImpl::_handleHeartbeatRespo
break;
}
case HeartbeatResponseAction::PriorityTakeover: {
- // Don't schedule a takeover if one is already scheduled.
- if (!_priorityTakeoverCbh.isValid()) {
+ // Don't schedule a priority takeover if any takeover is already scheduled.
+ if (!_priorityTakeoverCbh.isValid() && !_catchupTakeoverCbh.isValid()) {
// Add randomized offset to calculated priority takeover delay.
Milliseconds priorityTakeoverDelay = _rsConfig.getPriorityTakeoverDelay(_selfIndex);
@@ -290,6 +290,21 @@ stdx::unique_lock<stdx::mutex> ReplicationCoordinatorImpl::_handleHeartbeatRespo
}
break;
}
+ case HeartbeatResponseAction::CatchupTakeover: {
+ // Don't schedule a catchup takeover if any takeover is already scheduled.
+ if (!_catchupTakeoverCbh.isValid() && !_priorityTakeoverCbh.isValid()) {
+ Milliseconds catchupTakeoverDelay = _rsConfig.getCatchupTakeoverDelay();
+ _catchupTakeoverWhen = _replExecutor->now() + catchupTakeoverDelay;
+ log() << "Scheduling catchup takeover at " << _catchupTakeoverWhen;
+ _catchupTakeoverCbh = _scheduleWorkAt(
+ _catchupTakeoverWhen, [this](const executor::TaskExecutor::CallbackArgs& args) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _cancelCatchupTakeover_inlock();
+ log() << "Starting an election for a catchup takeover [NOOP]";
+ });
+ }
+ break;
+ }
}
return lock;
}
@@ -705,6 +720,15 @@ void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() {
}
}
+void ReplicationCoordinatorImpl::_cancelCatchupTakeover_inlock() {
+ if (_catchupTakeoverCbh.isValid()) {
+ log() << "Canceling catchup takeover callback";
+ _replExecutor->cancel(_catchupTakeoverCbh);
+ _catchupTakeoverCbh = CallbackHandle();
+ _catchupTakeoverWhen = Date_t();
+ }
+}
+
void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() {
if (_handleElectionTimeoutCbh.isValid()) {
LOG(4) << "Canceling election timeout callback at " << _handleElectionTimeoutWhen;
@@ -755,6 +779,7 @@ void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(StartElectionV1Reas
// We should always reschedule this callback even if we do not make it to the election
// process.
{
+ _cancelCatchupTakeover_inlock();
_cancelPriorityTakeover_inlock();
_cancelAndRescheduleElectionTimeout_inlock();
if (_inShutdown) {
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index e6ab1309da8..de85d892552 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -1301,20 +1301,34 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1(
// Clear last heartbeat message on ourselves.
setMyHeartbeatMessage(now, "");
- // Priority takeover when the replset is stable.
+ // Takeover when the replset is stable.
//
// Take over the primary only if the remote primary is in the latest term I know.
// This is done only when we get a heartbeat response from the primary.
// Otherwise, there must be an outstanding election, which may succeed or not, but
// the remote primary will become aware of that election eventually and step down.
- if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex &&
- _rsConfig.getMemberAt(primaryIndex).getPriority() <
+ if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) {
+
+ if (_memberData.at(primaryIndex).getLastAppliedOpTime() <
+ _memberData.at(_selfIndex).getLastAppliedOpTime()) {
+ LOG(2) << "I can take over the primary due to fresher data."
+ << " Current primary index: " << primaryIndex << " in term "
+ << _memberData.at(primaryIndex).getTerm() << "."
+ << " Current primary optime: "
+ << _memberData.at(primaryIndex).getLastAppliedOpTime()
+ << " My optime: " << _memberData.at(_selfIndex).getLastAppliedOpTime();
+
+ return HeartbeatResponseAction::makeCatchupTakeoverAction();
+ }
+
+ if (_rsConfig.getMemberAt(primaryIndex).getPriority() <
_rsConfig.getMemberAt(_selfIndex).getPriority()) {
- LOG(4) << "I can take over the primary due to higher priority."
- << " Current primary index: " << primaryIndex << " in term "
- << _memberData.at(primaryIndex).getTerm();
+ LOG(4) << "I can take over the primary due to higher priority."
+ << " Current primary index: " << primaryIndex << " in term "
+ << _memberData.at(primaryIndex).getTerm();
- return HeartbeatResponseAction::makePriorityTakeoverAction();
+ return HeartbeatResponseAction::makePriorityTakeoverAction();
+ }
}
return HeartbeatResponseAction::makeNoAction();
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 783ed22ad04..72dbefe0def 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -3830,6 +3830,76 @@ TEST_F(HeartbeatResponseTestV1, UpdateHeartbeatDataTermPreventsPriorityTakeover)
}
TEST_F(HeartbeatResponseTestV1,
+ ScheduleACatchupTakeoverWhenElectableAndReceiveHeartbeatFromPrimaryInCatchup) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 6 << "host"
+ << "host7:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+
+ OpTime election = OpTime();
+ OpTime lastOpTimeAppliedSecondary = OpTime(Timestamp(300, 0), 0);
+ OpTime lastOpTimeAppliedPrimary = OpTime(Timestamp(200, 0), 0);
+
+ ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
+ getTopoCoord().getMyMemberData()->setLastAppliedOpTime(lastOpTimeAppliedSecondary, Date_t());
+ HeartbeatResponseAction nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_PRIMARY, election, lastOpTimeAppliedPrimary);
+ ASSERT_EQUALS(HeartbeatResponseAction::CatchupTakeover, nextAction.getAction());
+ ASSERT_EQUALS(1, getCurrentPrimaryIndex());
+}
+
+TEST_F(HeartbeatResponseTestV1,
+ ScheduleACatchupTakeoverWhenBothCatchupAndPriorityTakeoverPossible) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host0:27017"
+ << "priority"
+ << 2)
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 6 << "host"
+ << "host7:27017"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+
+ << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+
+ OpTime election = OpTime();
+ OpTime lastOpTimeAppliedSecondary = OpTime(Timestamp(300, 0), 0);
+ OpTime lastOpTimeAppliedPrimary = OpTime(Timestamp(200, 0), 0);
+
+ ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
+ getTopoCoord().getMyMemberData()->setLastAppliedOpTime(lastOpTimeAppliedSecondary, Date_t());
+ HeartbeatResponseAction nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_PRIMARY, election, lastOpTimeAppliedPrimary);
+ ASSERT_EQUALS(HeartbeatResponseAction::CatchupTakeover, nextAction.getAction());
+ ASSERT_EQUALS(1, getCurrentPrimaryIndex());
+}
+
+TEST_F(HeartbeatResponseTestV1,
ScheduleElectionIfAMajorityOfVotersIsVisibleEvenThoughATrueMajorityIsNot) {
updateConfig(BSON("_id"
<< "rs0"