diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/heartbeat_response_action.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/heartbeat_response_action.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_config.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_config.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_config_test.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp | 407 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp | 70 |
11 files changed, 606 insertions, 26 deletions
diff --git a/src/mongo/db/repl/heartbeat_response_action.cpp b/src/mongo/db/repl/heartbeat_response_action.cpp index 7e787d42c96..014e021a4eb 100644 --- a/src/mongo/db/repl/heartbeat_response_action.cpp +++ b/src/mongo/db/repl/heartbeat_response_action.cpp @@ -49,6 +49,12 @@ HeartbeatResponseAction HeartbeatResponseAction::makePriorityTakeoverAction() { return result; } +HeartbeatResponseAction HeartbeatResponseAction::makeCatchupTakeoverAction() { + HeartbeatResponseAction result; + result._action = CatchupTakeover; + return result; +} + HeartbeatResponseAction HeartbeatResponseAction::makeElectAction() { HeartbeatResponseAction result; result._action = StartElection; diff --git a/src/mongo/db/repl/heartbeat_response_action.h b/src/mongo/db/repl/heartbeat_response_action.h index bb009600e5a..191448dd9bc 100644 --- a/src/mongo/db/repl/heartbeat_response_action.h +++ b/src/mongo/db/repl/heartbeat_response_action.h @@ -50,7 +50,8 @@ public: StartElection, StepDownSelf, StepDownRemotePrimary, - PriorityTakeover + PriorityTakeover, + CatchupTakeover }; /** @@ -75,6 +76,12 @@ public: static HeartbeatResponseAction makePriorityTakeoverAction(); /** + * Makes a new action telling the current node to schedule an event to attempt to elect itself + * primary after the appropriate catchup takeover delay. + */ + static HeartbeatResponseAction makeCatchupTakeoverAction(); + + /** * Makes a new action telling the current node to step down as primary. * * It is an error to call this with primaryIndex != the index of the current node. diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp index 0cd9945b7dc..cace43edfcd 100644 --- a/src/mongo/db/repl/repl_set_config.cpp +++ b/src/mongo/db/repl/repl_set_config.cpp @@ -54,6 +54,7 @@ const Seconds ReplSetConfig::kDefaultHeartbeatTimeoutPeriod(10); const Milliseconds ReplSetConfig::kDefaultElectionTimeoutPeriod(10000); const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(kInfiniteCatchUpTimeout); const bool ReplSetConfig::kDefaultChainingAllowed(true); +const Milliseconds ReplSetConfig::kDefaultCatchupTakeoverDelay(30000); namespace { @@ -830,6 +831,10 @@ Milliseconds ReplSetConfig::getPriorityTakeoverDelay(int memberIdx) const { return (priorityRank + 1) * getElectionTimeoutPeriod(); } +Milliseconds ReplSetConfig::getCatchupTakeoverDelay() const { + return kDefaultCatchupTakeoverDelay; +} + int ReplSetConfig::_calculatePriorityRank(double priority) const { int count = 0; for (MemberIterator mem = membersBegin(); mem != membersEnd(); mem++) { diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h index 3f226ae9295..0b8b868d747 100644 --- a/src/mongo/db/repl/repl_set_config.h +++ b/src/mongo/db/repl/repl_set_config.h @@ -66,6 +66,7 @@ public: static const Seconds kDefaultHeartbeatTimeoutPeriod; static const Milliseconds kDefaultCatchUpTimeoutPeriod; static const bool kDefaultChainingAllowed; + static const Milliseconds kDefaultCatchupTakeoverDelay; /** * Initializes this ReplSetConfig from the contents of "cfg". @@ -340,6 +341,12 @@ public: */ Milliseconds getPriorityTakeoverDelay(int memberIdx) const; + /** + * Returns the duration to wait before running for election when this node + * sees that it is more caught up than the current primary. + */ + Milliseconds getCatchupTakeoverDelay() const; + private: /** * Parses the "settings" subdocument of a replica set configuration. diff --git a/src/mongo/db/repl/repl_set_config_test.cpp b/src/mongo/db/repl/repl_set_config_test.cpp index 8f582195b51..f9d0ec01f9d 100644 --- a/src/mongo/db/repl/repl_set_config_test.cpp +++ b/src/mongo/db/repl/repl_set_config_test.cpp @@ -1632,6 +1632,31 @@ TEST(ReplSetConfig, GetPriorityTakeoverDelay) { ASSERT_EQUALS(Milliseconds(1000), configB.getPriorityTakeoverDelay(4)); } +TEST(ReplSetConfig, GetCatchupTakeoverDelay) { + ReplSetConfig configA; + ASSERT_OK(configA.initialize(BSON("_id" + << "rs0" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "localhost:12345" + << "priority" + << 1) + << BSON("_id" << 1 << "host" + << "localhost:54321" + << "priority" + << 2) + << BSON("_id" << 2 << "host" + << "localhost:5321" + << "priority" + << 3)) + << "settings" + << BSON("electionTimeoutMillis" << 1000)))); + ASSERT_OK(configA.validate()); + ASSERT_EQUALS(Milliseconds(30000), configA.getCatchupTakeoverDelay()); +} + TEST(ReplSetConfig, ConfirmDefaultValuesOfAndAbilityToSetWriteConcernMajorityJournalDefault) { // PV0, should default to false. ReplSetConfig config; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index bb2c96e7d20..045deec2bae 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -362,6 +362,19 @@ boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest( return _priorityTakeoverWhen; } +boost::optional<Date_t> ReplicationCoordinatorImpl::getCatchupTakeover_forTest() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!_catchupTakeoverCbh.isValid()) { + return boost::none; + } + return _catchupTakeoverWhen; +} + +executor::TaskExecutor::CallbackHandle ReplicationCoordinatorImpl::getCatchupTakeoverCbh_forTest() + const { + return _catchupTakeoverCbh; +} + OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const { stdx::lock_guard<stdx::mutex> lk(_mutex); return _getCurrentCommittedSnapshotOpTime_inlock(); @@ -2418,6 +2431,12 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { invariant(_uncommittedSnapshots.empty()); } + // If we are transitioning from secondary, cancel any scheduled takeovers. + if (_memberState.secondary()) { + _cancelCatchupTakeover_inlock(); + _cancelPriorityTakeover_inlock(); + } + _memberState = newState; log() << "transition to " << newState.toString() << rsLog; @@ -2641,6 +2660,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC log() << "This node is not a member of the config"; } + _cancelCatchupTakeover_inlock(); _cancelPriorityTakeover_inlock(); _cancelAndRescheduleElectionTimeout_inlock(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 8ec05babf24..1fbc2d059fc 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -361,6 +361,17 @@ public: boost::optional<Date_t> getPriorityTakeover_forTest() const; /** + * Returns the scheduled time of the catchup takeover callback. If a catchup + * takeover has not been scheduled, returns boost::none. + */ + boost::optional<Date_t> getCatchupTakeover_forTest() const; + + /** + * Returns the catchup takeover CallbackHandle. + */ + executor::TaskExecutor::CallbackHandle getCatchupTakeoverCbh_forTest() const; + + /** * Simple wrappers around _setLastOptime_inlock to make it easier to test. */ Status setLastAppliedOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime); @@ -1033,6 +1044,11 @@ private: void _cancelPriorityTakeover_inlock(); /** + * Cancels all outstanding _catchupTakeover callbacks. + */ + void _cancelCatchupTakeover_inlock(); + + /** * Cancels the current _handleElectionTimeout callback and reschedules a new callback. * Returns immediately otherwise. */ @@ -1282,7 +1298,7 @@ private: // Used for testing only. Date_t _handleElectionTimeoutWhen; // (M) - // Callback Handle used to cancel a scheduled PriorityTakover callback. + // Callback Handle used to cancel a scheduled PriorityTakeover callback. executor::TaskExecutor::CallbackHandle _priorityTakeoverCbh; // (M) // Priority takeover callback will not run before this time. @@ -1290,6 +1306,14 @@ private: // Used for testing only. Date_t _priorityTakeoverWhen; // (M) + // Callback Handle used to cancel a scheduled CatchupTakeover callback. + executor::TaskExecutor::CallbackHandle _catchupTakeoverCbh; // (M) + + // Catchup takeover callback will not run before this time. + // If this date is Date_t(), the callback is either unscheduled or canceled. + // Used for testing only. + Date_t _catchupTakeoverWhen; // (M) + // Callback handle used by _waitForStartUpComplete() to block until configuration // is loaded and external state threads have been started (unless this node is an arbiter). CallbackHandle _finishLoadLocalConfigCbh; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 3bbb4b783d7..6248e4e6423 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -791,16 +791,16 @@ TEST_F(ReplCoordTest, ElectionFailsWhenTermChangesDuringActualElection) { countLogLinesContaining("not becoming primary, we have been superceded already")); } -class PriorityTakeoverTest : public ReplCoordTest { +class TakeoverTest : public ReplCoordTest { public: /* * Verify that a given priority takeover delay is valid. Takeover delays are * verified in terms of bounds since the delay value is randomized. */ - void assertValidTakeoverDelay(ReplSetConfig config, - Date_t now, - Date_t priorityTakeoverTime, - int nodeIndex) { + void assertValidPriorityTakeoverDelay(ReplSetConfig config, + Date_t now, + Date_t priorityTakeoverTime, + int nodeIndex) { Milliseconds priorityTakeoverDelay = priorityTakeoverTime - now; Milliseconds electionTimeout = config.getElectionTimeoutPeriod(); @@ -922,7 +922,384 @@ private: } }; -TEST_F(PriorityTakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityThanCurrentPrimary) { +TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfNodeIsFresherThanCurrentPrimary) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "protocolVersion" + << 1); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + auto replCoord = getReplCoord(); + auto now = getNet()->now(); + + OperationContextNoop opCtx; + OpTime currentOptime(Timestamp(200, 1), 0); + replCoord->setMyLastAppliedOpTime(currentOptime); + replCoord->setMyLastDurableOpTime(currentOptime); + OpTime behindOptime(Timestamp(100, 1), 0); + + // Make sure we're secondary and that no catchup takeover has been scheduled yet. + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + + // Mock a first round of heartbeat responses, which should give us enough information to know + // that we are fresher than the current primary, prompting the scheduling of a catchup + // takeover. + now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime); + + // Make sure that the catchup takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getCatchupTakeover_forTest()); + auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get(); + Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now; + ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay); +} + +TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfBothTakeoversAnOption) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345" + << "priority" + << 2) + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "protocolVersion" + << 1); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + auto replCoord = getReplCoord(); + auto now = getNet()->now(); + + OperationContextNoop opCtx; + OpTime currentOptime(Timestamp(200, 1), 0); + replCoord->setMyLastAppliedOpTime(currentOptime); + replCoord->setMyLastDurableOpTime(currentOptime); + OpTime behindOptime(Timestamp(100, 1), 0); + + // Make sure we're secondary and that no catchup takeover has been scheduled. + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + + // Mock a first round of heartbeat responses, which should give us enough information to know + // that we are fresher than the current primary, prompting the scheduling of a catchup + // takeover. + now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime); + + // Make sure that the catchup takeover has actually been scheduled at the + // correct time and that a priority takeover has not been scheduled. + ASSERT(replCoord->getCatchupTakeover_forTest()); + ASSERT_FALSE(replCoord->getPriorityTakeover_forTest()); + auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get(); + Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now; + ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay); +} + +TEST_F(TakeoverTest, CatchupTakeoverNotScheduledTwice) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "protocolVersion" + << 1); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + auto replCoord = getReplCoord(); + auto now = getNet()->now(); + + OperationContextNoop opCtx; + OpTime currentOptime(Timestamp(200, 1), 0); + replCoord->setMyLastAppliedOpTime(currentOptime); + replCoord->setMyLastDurableOpTime(currentOptime); + OpTime behindOptime(Timestamp(100, 1), 0); + + // Make sure we're secondary and that no catchup takeover has been scheduled. + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + + // Mock a first round of heartbeat responses, which should give us enough information to know + // that we are fresher than the current primary, prompting the scheduling of a catchup + // takeover. + now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime); + + // Make sure that the catchup takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getCatchupTakeover_forTest()); + executor::TaskExecutor::CallbackHandle catchupTakeoverCbh = + replCoord->getCatchupTakeoverCbh_forTest(); + auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get(); + Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now; + ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay); + + // Mock another round of heartbeat responses + now = respondToHeartbeatsUntil( + config, now + config.getHeartbeatInterval(), HostAndPort("node2", 12345), behindOptime); + + // Make sure another catchup takeover wasn't scheduled + ASSERT_EQUALS(catchupTakeoverTime, replCoord->getCatchupTakeover_forTest().get()); + ASSERT_TRUE(catchupTakeoverCbh == replCoord->getCatchupTakeoverCbh_forTest()); +} + +TEST_F(TakeoverTest, CatchupAndPriorityTakeoverNotScheduledAtSameTime) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345" + << "priority" + << 2) + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "protocolVersion" + << 1); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + auto replCoord = getReplCoord(); + auto now = getNet()->now(); + + OperationContextNoop opCtx; + OpTime currentOptime(Timestamp(200, 1), 0); + replCoord->setMyLastAppliedOpTime(currentOptime); + replCoord->setMyLastDurableOpTime(currentOptime); + OpTime behindOptime(Timestamp(100, 1), 0); + + // Make sure we're secondary and that no catchup takeover has been scheduled. + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + + // Mock a first round of heartbeat responses, which should give us enough information to know + // that we are fresher than the current primary, prompting the scheduling of a catchup + // takeover. + now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime); + + // Make sure that the catchup takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getCatchupTakeover_forTest()); + auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get(); + Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now; + ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay); + + // Mock another heartbeat where the primary is now up to date + now = respondToHeartbeatsUntil( + config, now + catchupTakeoverDelay / 2, HostAndPort("node2", 12345), currentOptime); + + // Since we are no longer ahead of the primary, we can't schedule a catchup + // takeover anymore. But we are still higher priority than the primary, so + // after the heartbeat we will try to schedule a priority takeover. + // Because we can't schedule two takeovers at the same time and the + // catchup takeover hasn't fired yet, make sure that we don't schedule a + // priority takeover. + ASSERT(replCoord->getCatchupTakeover_forTest()); + ASSERT_FALSE(replCoord->getPriorityTakeover_forTest()); +} + +TEST_F(TakeoverTest, CatchupTakeoverCallbackCanceledIfElectionTimeoutRuns) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "protocolVersion" + << 1); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + auto replCoord = getReplCoord(); + auto now = getNet()->now(); + + OperationContextNoop opCtx; + OpTime currentOptime(Timestamp(200, 1), 0); + replCoord->setMyLastAppliedOpTime(currentOptime); + replCoord->setMyLastDurableOpTime(currentOptime); + OpTime behindOptime(Timestamp(100, 1), 0); + + // Make sure we're secondary and that no catchup takeover has been scheduled. + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + + startCapturingLogMessages(); + + // Mock a first round of heartbeat responses, which should give us enough information to know + // that we are fresher than the current primary, prompting the scheduling of a catchup + // takeover. + now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime); + + // Make sure that the catchup takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getCatchupTakeover_forTest()); + auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get(); + Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now; + ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay); + + // Fast forward clock to after electionTimeout and black hole all + // heartbeat requests to make sure the election timeout runs. + Date_t electionTimeout = replCoord->getElectionTimeout_forTest(); + auto net = getNet(); + net->enterNetwork(); + while (net->now() < electionTimeout) { + net->runUntil(electionTimeout); + while (net->hasReadyRequests()) { + auto noi = net->getNextReadyRequest(); + net->blackHole(noi); + } + } + ASSERT_EQUALS(electionTimeout, net->now()); + net->exitNetwork(); + + stopCapturingLogMessages(); + + ASSERT_EQUALS(1, countLogLinesContaining("Starting an election, since we've seen no PRIMARY")); + + // Make sure catchup takeover never happend and CatchupTakeover callback was canceled. + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + ASSERT(replCoord->getMemberState().secondary()); + ASSERT_EQUALS(1, countLogLinesContaining("Canceling catchup takeover callback")); + ASSERT_EQUALS(0, countLogLinesContaining("Starting an election for a catchup takeover")); +} + +TEST_F(TakeoverTest, CatchupTakeoverCanceledIfTransitionToRollback) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "protocolVersion" + << 1); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + auto replCoord = getReplCoord(); + auto now = getNet()->now(); + + OperationContextNoop opCtx; + OpTime currentOptime(Timestamp(200, 1), 0); + replCoord->setMyLastAppliedOpTime(currentOptime); + replCoord->setMyLastDurableOpTime(currentOptime); + OpTime behindOptime(Timestamp(100, 1), 0); + + // Make sure we're secondary and that no catchup takeover has been scheduled. + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + + startCapturingLogMessages(); + + // Mock a first round of heartbeat responses, which should give us enough information to know + // that we are fresher than the current primary, prompting the scheduling of a catchup + // takeover. + now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime); + + // Make sure that the catchup takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getCatchupTakeover_forTest()); + auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get(); + Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now; + ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay); + + // Transitioning to rollback state should cancel the takeover + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_ROLLBACK)); + ASSERT_TRUE(replCoord->getMemberState().rollback()); + + stopCapturingLogMessages(); + + // Make sure catchup takeover never happend and CatchupTakeover callback was canceled. + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + ASSERT_EQUALS(1, countLogLinesContaining("Canceling catchup takeover callback")); + ASSERT_EQUALS(0, countLogLinesContaining("Starting an election for a catchup takeover")); +} + +TEST_F(TakeoverTest, CatchupTakeoverElectionIsANoop) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "protocolVersion" + << 1); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + HostAndPort primaryHostAndPort("node2", 12345); + + auto replCoord = getReplCoord(); + auto now = getNet()->now(); + + OperationContextNoop opCtx; + OpTime currentOptime(Timestamp(100, 5000), 0); + OpTime behindOptime(Timestamp(100, 4000), 0); + + replCoord->setMyLastAppliedOpTime(currentOptime); + replCoord->setMyLastDurableOpTime(currentOptime); + + // Make sure we're secondary and that no takeover has been scheduled. + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + + // Mock a first round of heartbeat responses. + now = respondToHeartbeatsUntil(config, now, primaryHostAndPort, behindOptime); + + // Make sure that the catchup takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getCatchupTakeover_forTest()); + auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get(); + Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now; + ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay); + + startCapturingLogMessages(); + now = respondToHeartbeatsUntil(config, catchupTakeoverTime, primaryHostAndPort, behindOptime); + stopCapturingLogMessages(); + + // Make sure that the catchup takeover fired as a NOOP. + ASSERT(replCoord->getMemberState().secondary()); + ASSERT_EQUALS(1, countLogLinesContaining("Starting an election for a catchup takeover [NOOP]")); +} + +TEST_F(TakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityThanCurrentPrimary) { BSONObj configObj = BSON("_id" << "mySet" << "version" @@ -962,14 +1339,14 @@ TEST_F(PriorityTakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityTha // correct time. ASSERT(replCoord->getPriorityTakeover_forTest()); auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); - assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); + assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0); // Also make sure that updating the term cancels the scheduled priority takeover. ASSERT_EQUALS(ErrorCodes::StaleTerm, replCoord->updateTerm(&opCtx, replCoord->getTerm() + 1)); ASSERT_FALSE(replCoord->getPriorityTakeover_forTest()); } -TEST_F(PriorityTakeoverTest, SuccessfulPriorityTakeover) { +TEST_F(TakeoverTest, SuccessfulPriorityTakeover) { BSONObj configObj = BSON("_id" << "mySet" << "version" @@ -1009,7 +1386,7 @@ TEST_F(PriorityTakeoverTest, SuccessfulPriorityTakeover) { // correct time. ASSERT(replCoord->getPriorityTakeover_forTest()); auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); - assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); + assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0); // The priority takeover might be scheduled at a time later than one election // timeout after our initial heartbeat responses, so mock another round of @@ -1021,7 +1398,7 @@ TEST_F(PriorityTakeoverTest, SuccessfulPriorityTakeover) { performSuccessfulPriorityTakeover(priorityTakeoverTime); } -TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) { +TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) { BSONObj configObj = BSON("_id" << "mySet" << "version" @@ -1064,7 +1441,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) { // correct time. ASSERT(replCoord->getPriorityTakeover_forTest()); auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); - assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); + assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0); // At this point the other nodes are all ahead of the current node, so it can't call for // priority takeover. @@ -1088,7 +1465,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) { // correct time. ASSERT(replCoord->getPriorityTakeover_forTest()); priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); - assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); + assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0); // Now make us caught up enough to call for priority takeover to succeed. replCoord->setMyLastAppliedOpTime(closeEnoughOpTime); @@ -1097,7 +1474,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) { performSuccessfulPriorityTakeover(priorityTakeoverTime); } -TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecond) { +TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecond) { BSONObj configObj = BSON("_id" << "mySet" << "version" @@ -1139,7 +1516,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecon // correct time. ASSERT(replCoord->getPriorityTakeover_forTest()); auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); - assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); + assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0); // At this point the other nodes are all ahead of the current node, so it can't call for // priority takeover. @@ -1163,7 +1540,7 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecon // correct time. ASSERT(replCoord->getPriorityTakeover_forTest()); priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); - assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); + assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0); // Now make us caught up enough to call for priority takeover to succeed. replCoord->setMyLastAppliedOpTime(closeEnoughOpTime); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 379a18fb231..0298002326c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -274,8 +274,8 @@ stdx::unique_lock<stdx::mutex> ReplicationCoordinatorImpl::_handleHeartbeatRespo break; } case HeartbeatResponseAction::PriorityTakeover: { - // Don't schedule a takeover if one is already scheduled. - if (!_priorityTakeoverCbh.isValid()) { + // Don't schedule a priority takeover if any takeover is already scheduled. + if (!_priorityTakeoverCbh.isValid() && !_catchupTakeoverCbh.isValid()) { // Add randomized offset to calculated priority takeover delay. Milliseconds priorityTakeoverDelay = _rsConfig.getPriorityTakeoverDelay(_selfIndex); @@ -290,6 +290,21 @@ stdx::unique_lock<stdx::mutex> ReplicationCoordinatorImpl::_handleHeartbeatRespo } break; } + case HeartbeatResponseAction::CatchupTakeover: { + // Don't schedule a catchup takeover if any takeover is already scheduled. + if (!_catchupTakeoverCbh.isValid() && !_priorityTakeoverCbh.isValid()) { + Milliseconds catchupTakeoverDelay = _rsConfig.getCatchupTakeoverDelay(); + _catchupTakeoverWhen = _replExecutor->now() + catchupTakeoverDelay; + log() << "Scheduling catchup takeover at " << _catchupTakeoverWhen; + _catchupTakeoverCbh = _scheduleWorkAt( + _catchupTakeoverWhen, [this](const executor::TaskExecutor::CallbackArgs& args) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _cancelCatchupTakeover_inlock(); + log() << "Starting an election for a catchup takeover [NOOP]"; + }); + } + break; + } } return lock; } @@ -705,6 +720,15 @@ void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() { } } +void ReplicationCoordinatorImpl::_cancelCatchupTakeover_inlock() { + if (_catchupTakeoverCbh.isValid()) { + log() << "Canceling catchup takeover callback"; + _replExecutor->cancel(_catchupTakeoverCbh); + _catchupTakeoverCbh = CallbackHandle(); + _catchupTakeoverWhen = Date_t(); + } +} + void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() { if (_handleElectionTimeoutCbh.isValid()) { LOG(4) << "Canceling election timeout callback at " << _handleElectionTimeoutWhen; @@ -755,6 +779,7 @@ void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(StartElectionV1Reas // We should always reschedule this callback even if we do not make it to the election // process. { + _cancelCatchupTakeover_inlock(); _cancelPriorityTakeover_inlock(); _cancelAndRescheduleElectionTimeout_inlock(); if (_inShutdown) { diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index e6ab1309da8..de85d892552 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -1301,20 +1301,34 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1( // Clear last heartbeat message on ourselves. setMyHeartbeatMessage(now, ""); - // Priority takeover when the replset is stable. + // Takeover when the replset is stable. // // Take over the primary only if the remote primary is in the latest term I know. // This is done only when we get a heartbeat response from the primary. // Otherwise, there must be an outstanding election, which may succeed or not, but // the remote primary will become aware of that election eventually and step down. - if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex && - _rsConfig.getMemberAt(primaryIndex).getPriority() < + if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) { + + if (_memberData.at(primaryIndex).getLastAppliedOpTime() < + _memberData.at(_selfIndex).getLastAppliedOpTime()) { + LOG(2) << "I can take over the primary due to fresher data." + << " Current primary index: " << primaryIndex << " in term " + << _memberData.at(primaryIndex).getTerm() << "." + << " Current primary optime: " + << _memberData.at(primaryIndex).getLastAppliedOpTime() + << " My optime: " << _memberData.at(_selfIndex).getLastAppliedOpTime(); + + return HeartbeatResponseAction::makeCatchupTakeoverAction(); + } + + if (_rsConfig.getMemberAt(primaryIndex).getPriority() < _rsConfig.getMemberAt(_selfIndex).getPriority()) { - LOG(4) << "I can take over the primary due to higher priority." - << " Current primary index: " << primaryIndex << " in term " - << _memberData.at(primaryIndex).getTerm(); + LOG(4) << "I can take over the primary due to higher priority." + << " Current primary index: " << primaryIndex << " in term " + << _memberData.at(primaryIndex).getTerm(); - return HeartbeatResponseAction::makePriorityTakeoverAction(); + return HeartbeatResponseAction::makePriorityTakeoverAction(); + } } return HeartbeatResponseAction::makeNoAction(); } diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp index 783ed22ad04..72dbefe0def 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp @@ -3830,6 +3830,76 @@ TEST_F(HeartbeatResponseTestV1, UpdateHeartbeatDataTermPreventsPriorityTakeover) } TEST_F(HeartbeatResponseTestV1, + ScheduleACatchupTakeoverWhenElectableAndReceiveHeartbeatFromPrimaryInCatchup) { + updateConfig(BSON("_id" + << "rs0" + << "version" + << 5 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "host1:27017") + << BSON("_id" << 1 << "host" + << "host2:27017") + << BSON("_id" << 6 << "host" + << "host7:27017")) + << "protocolVersion" + << 1 + << "settings" + + << BSON("heartbeatTimeoutSecs" << 5)), + 0); + + setSelfMemberState(MemberState::RS_SECONDARY); + + OpTime election = OpTime(); + OpTime lastOpTimeAppliedSecondary = OpTime(Timestamp(300, 0), 0); + OpTime lastOpTimeAppliedPrimary = OpTime(Timestamp(200, 0), 0); + + ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); + getTopoCoord().getMyMemberData()->setLastAppliedOpTime(lastOpTimeAppliedSecondary, Date_t()); + HeartbeatResponseAction nextAction = receiveUpHeartbeat( + HostAndPort("host2"), "rs0", MemberState::RS_PRIMARY, election, lastOpTimeAppliedPrimary); + ASSERT_EQUALS(HeartbeatResponseAction::CatchupTakeover, nextAction.getAction()); + ASSERT_EQUALS(1, getCurrentPrimaryIndex()); +} + +TEST_F(HeartbeatResponseTestV1, + ScheduleACatchupTakeoverWhenBothCatchupAndPriorityTakeoverPossible) { + updateConfig(BSON("_id" + << "rs0" + << "version" + << 5 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "host0:27017" + << "priority" + << 2) + << BSON("_id" << 1 << "host" + << "host2:27017") + << BSON("_id" << 6 << "host" + << "host7:27017")) + << "protocolVersion" + << 1 + << "settings" + + << BSON("heartbeatTimeoutSecs" << 5)), + 0); + + setSelfMemberState(MemberState::RS_SECONDARY); + + OpTime election = OpTime(); + OpTime lastOpTimeAppliedSecondary = OpTime(Timestamp(300, 0), 0); + OpTime lastOpTimeAppliedPrimary = OpTime(Timestamp(200, 0), 0); + + ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); + getTopoCoord().getMyMemberData()->setLastAppliedOpTime(lastOpTimeAppliedSecondary, Date_t()); + HeartbeatResponseAction nextAction = receiveUpHeartbeat( + HostAndPort("host2"), "rs0", MemberState::RS_PRIMARY, election, lastOpTimeAppliedPrimary); + ASSERT_EQUALS(HeartbeatResponseAction::CatchupTakeover, nextAction.getAction()); + ASSERT_EQUALS(1, getCurrentPrimaryIndex()); +} + +TEST_F(HeartbeatResponseTestV1, ScheduleElectionIfAMajorityOfVotersIsVisibleEvenThoughATrueMajorityIsNot) { updateConfig(BSON("_id" << "rs0" |