diff options
author | William Schultz <william.schultz@mongodb.com> | 2017-02-07 17:00:54 -0500 |
---|---|---|
committer | William Schultz <william.schultz@mongodb.com> | 2017-03-01 19:46:14 -0500 |
commit | aa8987a0d92c06c900579c5d38e814cd0647a2d5 (patch) | |
tree | 4758b11245c0960b92b845a19afa59a51a90dcd8 | |
parent | 962e21a702ffa6bb2e90df0b9f9dbd9b79c53f34 (diff) | |
download | mongo-aa8987a0d92c06c900579c5d38e814cd0647a2d5.tar.gz |
SERVER-26830 Randomize priority takeover delay
4 files changed, 287 insertions, 98 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a6d936efc0c..3aeb75b4326 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -383,10 +383,14 @@ Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const { return _handleElectionTimeoutWhen; } -Date_t ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const { +Milliseconds ReplicationCoordinatorImpl::getRandomizedElectionOffset_forTest() { + return _getRandomizedElectionOffset(); +} + +boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const { stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_priorityTakeoverCbh.isValid()) { - return Date_t(); + return boost::none; } return _priorityTakeoverWhen; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 1e2543367a9..0b357e70d49 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -343,11 +343,17 @@ public: */ Date_t getElectionTimeout_forTest() const; + /* + * Return a randomized offset amount that is scaled in proportion to the size of the + * _electionTimeoutPeriod. + */ + Milliseconds getRandomizedElectionOffset_forTest(); + /** - * Returns scheduled time of priority takeover callback. - * Returns Date_t() if callback is not scheduled. + * Returns the scheduled time of the priority takeover callback. If a priority + * takeover has not been scheduled, returns boost::none. */ - Date_t getPriorityTakeover_forTest() const; + boost::optional<Date_t> getPriorityTakeover_forTest() const; /** * Simple wrappers around _setLastOptime_inlock to make it easier to test. @@ -726,6 +732,12 @@ private: void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle); + /* + * Return a randomized offset amount that is scaled in proportion to the size of the + * _electionTimeoutPeriod. Used to add randomization to an election timeout. + */ + Milliseconds _getRandomizedElectionOffset(); + /** * Helper for _handleHeartbeatResponse. * diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 137c6483a6e..8fd94ed8d00 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -53,6 +53,59 @@ using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using ApplierState = ReplicationCoordinator::ApplierState; +TEST_F(ReplCoordTest, RandomizedElectionOffsetWithinProperBounds) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplicaSetConfig config = assertMakeRSConfig(configObj); + + Milliseconds electionTimeout = config.getElectionTimeoutPeriod(); + long long randomOffsetUpperBound = durationCount<Milliseconds>(electionTimeout) * + getExternalState()->getElectionTimeoutOffsetLimitFraction(); + Milliseconds randomOffset; + + // Verify for numerous rounds of random number generation. + int rounds = 1000; + for (int i = 0; i < rounds; i++) { + randomOffset = getReplCoord()->getRandomizedElectionOffset_forTest(); + ASSERT_GREATER_THAN_OR_EQUALS(randomOffset, Milliseconds(0)); + ASSERT_LESS_THAN_OR_EQUALS(randomOffset, Milliseconds(randomOffsetUpperBound)); + } +} + +TEST_F(ReplCoordTest, RandomizedElectionOffsetAvoidsDivideByZero) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "protocolVersion" + << 1 + << "settings" + << BSON("electionTimeoutMillis" << 1)); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + + // Make sure that an election timeout of 1ms doesn't make the random number + // generator attempt to divide by zero. + Milliseconds randomOffset = getReplCoord()->getRandomizedElectionOffset_forTest(); + ASSERT_EQ(Milliseconds(0), randomOffset); +} + TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { assertStartSuccess(BSON("_id" << "mySet" @@ -738,46 +791,70 @@ TEST_F(ReplCoordTest, ElectionFailsWhenTermChangesDuringActualElection) { class PriorityTakeoverTest : public ReplCoordTest { public: - void respondToAllHeartbeats(const ReplicaSetConfig& config, - Date_t runUntil, - const HostAndPort& primaryHostAndPort, - const OpTime& otherNodesOpTime) { - auto replCoord = getReplCoord(); + /* + * Verify that a given priority takeover delay is valid. Takeover delays are + * verified in terms of bounds since the delay value is randomized. + */ + void assertValidTakeoverDelay(ReplicaSetConfig config, + Date_t now, + Date_t priorityTakeoverTime, + int nodeIndex) { + + Milliseconds priorityTakeoverDelay = priorityTakeoverTime - now; + Milliseconds electionTimeout = config.getElectionTimeoutPeriod(); + + long long baseTakeoverDelay = + durationCount<Milliseconds>(config.getPriorityTakeoverDelay(nodeIndex)); + long long randomOffsetUpperBound = durationCount<Milliseconds>(electionTimeout) * + getExternalState()->getElectionTimeoutOffsetLimitFraction(); + + auto takeoverDelayUpperBound = Milliseconds(baseTakeoverDelay + randomOffsetUpperBound); + auto takeoverDelayLowerBound = Milliseconds(baseTakeoverDelay); + + ASSERT_GREATER_THAN_OR_EQUALS(priorityTakeoverDelay, takeoverDelayLowerBound); + ASSERT_LESS_THAN_OR_EQUALS(priorityTakeoverDelay, takeoverDelayUpperBound); + } + + /* + * Processes and mocks responses to any pending PV1 heartbeat requests that have been + * scheduled at or before 'until'. For any such scheduled heartbeat requests, the + * heartbeat responses will be mocked at the same time the request was made. So, + * for a heartbeat request made at time 't', the response will be mocked as + * occurring at time 't'. This function will always run the clock forward to time + * 'until'. + * + * The applied & durable optimes of the mocked response will be set to + * 'otherNodesOpTime', and the primary set as 'primaryHostAndPort'. + * + * Returns the time that it ran until, which should always be equal to 'until'. + */ + Date_t respondToHeartbeatsUntil(const ReplicaSetConfig& config, + Date_t until, + const HostAndPort& primaryHostAndPort, + const OpTime& otherNodesOpTime) { auto net = getNet(); net->enterNetwork(); - while (net->now() < runUntil || net->hasReadyRequests()) { - if (net->now() < runUntil) { - net->runUntil(runUntil); - } - auto noi = net->getNextReadyRequest(); - auto&& request = noi->getRequest(); - log() << request.target << " processing " << request.cmdObj; - ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); - ReplSetHeartbeatArgsV1 hbArgs; - if (hbArgs.initialize(request.cmdObj).isOK()) { - ReplSetHeartbeatResponse hbResp; - hbResp.setSetName(config.getReplSetName()); - if (request.target == primaryHostAndPort) { - hbResp.setState(MemberState::RS_PRIMARY); - } else { - hbResp.setState(MemberState::RS_SECONDARY); - } - hbResp.setConfigVersion(config.getConfigVersion()); - hbResp.setTerm(replCoord->getTerm()); - hbResp.setAppliedOpTime(otherNodesOpTime); - hbResp.setDurableOpTime(otherNodesOpTime); - auto response = - makeResponseStatus(hbResp.toBSON(replCoord->isV1ElectionProtocol())); - net->scheduleResponse(noi, net->now(), response); - } else { - error() << "Black holing unexpected request to " << request.target << ": " - << request.cmdObj; - net->blackHole(noi); + + // If 'until' is equal to net->now(), process any currently queued requests and return, + // without running the clock. + if (net->now() == until) { + _respondToHeartbeatsNow(config, primaryHostAndPort, otherNodesOpTime); + } else { + // Otherwise, run clock and process heartbeats along the way. + while (net->now() < until) { + // Run clock forward to time 'until', or until the time of the next queued request. + net->runUntil(until); + _respondToHeartbeatsNow(config, primaryHostAndPort, otherNodesOpTime); } } + net->runReadyNetworkOperations(); net->exitNetwork(); + + ASSERT_EQ(net->now(), until); + + return net->now(); } void performSuccessfulPriorityTakeover(Date_t priorityTakeoverTime) { @@ -797,6 +874,50 @@ public: ASSERT_EQUALS(1, countLogLinesContaining("Starting an election for a priority takeover")); ASSERT_EQUALS(1, countLogLinesContaining("election succeeded")); } + +private: + /* + * Processes and schedules mock responses to any PV1 heartbeat requests scheduled at or + * before the current time. Assumes that the caller has already entered the network with + * 'enterNetwork()'. It does not run the virtual clock. + * + * Intended as a helper function only. + */ + void _respondToHeartbeatsNow(const ReplicaSetConfig& config, + const HostAndPort& primaryHostAndPort, + const OpTime& otherNodesOpTime) { + + auto replCoord = getReplCoord(); + auto net = getNet(); + + // Process all requests queued at the present time. + while (net->hasReadyRequests()) { + auto noi = net->getNextReadyRequest(); + auto&& request = noi->getRequest(); + + log() << request.target << " processing " << request.cmdObj; + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + // Make sure the heartbeat request is valid. + ReplSetHeartbeatArgsV1 hbArgs; + ASSERT_OK(hbArgs.initialize(request.cmdObj)); + + // Build the mock heartbeat response. + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName(config.getReplSetName()); + if (request.target == primaryHostAndPort) { + hbResp.setState(MemberState::RS_PRIMARY); + } else { + hbResp.setState(MemberState::RS_SECONDARY); + } + hbResp.setConfigVersion(config.getConfigVersion()); + hbResp.setTerm(replCoord->getTerm()); + hbResp.setAppliedOpTime(otherNodesOpTime); + hbResp.setDurableOpTime(otherNodesOpTime); + auto response = makeResponseStatus(hbResp.toBSON(replCoord->isV1ElectionProtocol())); + net->scheduleResponse(noi, net->now(), response); + } + } }; TEST_F(PriorityTakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityThanCurrentPrimary) { @@ -819,25 +940,31 @@ TEST_F(PriorityTakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityTha ReplicaSetConfig config = assertMakeRSConfig(configObj); auto replCoord = getReplCoord(); + auto now = getNet()->now(); OperationContextNoop txn; - OpTime time1(Timestamp(100, 1), 0); - replCoord->setMyLastAppliedOpTime(time1); - replCoord->setMyLastDurableOpTime(time1); - ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + OpTime myOptime(Timestamp(100, 1), 0); + replCoord->setMyLastAppliedOpTime(myOptime); + replCoord->setMyLastDurableOpTime(myOptime); - ASSERT_EQUALS(Date_t(), replCoord->getPriorityTakeover_forTest()); + // Make sure we're secondary and that no priority takeover has been scheduled. + ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getPriorityTakeover_forTest()); - auto now = getNet()->now(); - respondToAllHeartbeats(config, now, HostAndPort("node2", 12345), time1); + // Mock a first round of heartbeat responses, which should give us enough information to know + // that we supersede priorities of all other nodes, prompting the scheduling of a priority + // takeover. + now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), myOptime); - ASSERT_NOT_EQUALS(Date_t(), replCoord->getPriorityTakeover_forTest()); - ASSERT_EQUALS(now + config.getPriorityTakeoverDelay(0), - replCoord->getPriorityTakeover_forTest()); + // Make sure that the priority takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getPriorityTakeover_forTest()); + auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); + assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); - // Updating term cancels priority takeover callback. + // Also make sure that updating the term cancels the scheduled priority takeover. ASSERT_EQUALS(ErrorCodes::StaleTerm, replCoord->updateTerm(&txn, replCoord->getTerm() + 1)); - ASSERT_EQUALS(Date_t(), replCoord->getPriorityTakeover_forTest()); + ASSERT_FALSE(replCoord->getPriorityTakeover_forTest()); } TEST_F(PriorityTakeoverTest, SuccessfulPriorityTakeover) { @@ -860,21 +987,34 @@ TEST_F(PriorityTakeoverTest, SuccessfulPriorityTakeover) { ReplicaSetConfig config = assertMakeRSConfig(configObj); auto replCoord = getReplCoord(); + auto now = getNet()->now(); OperationContextNoop txn; - OpTime time1(Timestamp(100, 1), 0); - replCoord->setMyLastAppliedOpTime(time1); - replCoord->setMyLastDurableOpTime(time1); - ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); - - ASSERT_EQUALS(Date_t(), replCoord->getPriorityTakeover_forTest()); + OpTime myOptime(Timestamp(100, 1), 0); + replCoord->setMyLastAppliedOpTime(myOptime); + replCoord->setMyLastDurableOpTime(myOptime); - auto now = getNet()->now(); - respondToAllHeartbeats(config, now, HostAndPort("node2", 12345), time1); - - auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest(); - ASSERT_NOT_EQUALS(Date_t(), priorityTakeoverTime); - ASSERT_EQUALS(now + config.getPriorityTakeoverDelay(0), priorityTakeoverTime); + // Make sure we're secondary and that no priority takeover has been scheduled. + ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getPriorityTakeover_forTest()); + + // Mock a first round of heartbeat responses, which should give us enough information to know + // that we supersede priorities of all other nodes, prompting the scheduling of a priority + // takeover. + now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), myOptime); + + // Make sure that the priority takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getPriorityTakeover_forTest()); + auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); + assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); + + // The priority takeover might be scheduled at a time later than one election + // timeout after our initial heartbeat responses, so mock another round of + // heartbeat responses to prevent a normal election timeout. + Milliseconds halfElectionTimeout = config.getElectionTimeoutPeriod() / 2; + now = respondToHeartbeatsUntil( + config, now + halfElectionTimeout, HostAndPort("node2", 12345), myOptime); performSuccessfulPriorityTakeover(priorityTakeoverTime); } @@ -900,44 +1040,53 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) { HostAndPort primaryHostAndPort("node2", 12345); auto replCoord = getReplCoord(); + auto timeZero = getNet()->now(); + auto now = getNet()->now(); OperationContextNoop txn; OpTime currentOpTime(Timestamp(100, 5000), 0); OpTime behindOpTime(Timestamp(100, 3999), 0); OpTime closeEnoughOpTime(Timestamp(100, 4000), 0); + replCoord->setMyLastAppliedOpTime(behindOpTime); replCoord->setMyLastDurableOpTime(behindOpTime); - ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); - ASSERT_EQUALS(Date_t(), replCoord->getPriorityTakeover_forTest()); - - auto now = getNet()->now(); - - respondToAllHeartbeats(config, now, primaryHostAndPort, currentOpTime); + // Make sure we're secondary and that no priority takeover has been scheduled. + ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getPriorityTakeover_forTest()); - auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest(); - ASSERT_NOT_EQUALS(Date_t(), priorityTakeoverTime); - ASSERT_EQUALS(now + config.getPriorityTakeoverDelay(0), priorityTakeoverTime); + // Mock a first round of heartbeat responses. + now = respondToHeartbeatsUntil(config, now, primaryHostAndPort, currentOpTime); + // Make sure that the priority takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getPriorityTakeover_forTest()); + auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); + assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); // At this point the other nodes are all ahead of the current node, so it can't call for // priority takeover. startCapturingLogMessages(); - respondToAllHeartbeats(config, priorityTakeoverTime, primaryHostAndPort, currentOpTime); + now = respondToHeartbeatsUntil(config, priorityTakeoverTime, primaryHostAndPort, currentOpTime); stopCapturingLogMessages(); - ASSERT(replCoord->getMemberState().secondary()); ASSERT_EQUALS(1, countLogLinesContaining("Not standing for election because member is not " "caught up enough to the most up-to-date member to " "call for priority takeover")); - now = getNet()->now(); - ASSERT_EQUALS(now, priorityTakeoverTime); - priorityTakeoverTime = replCoord->getPriorityTakeover_forTest(); - ASSERT_NOT_EQUALS(Date_t(), priorityTakeoverTime); - ASSERT_EQUALS(now + config.getPriorityTakeoverDelay(0), priorityTakeoverTime); + // Mock another round of heartbeat responses that occur after the previous + // 'priorityTakeoverTime', which should schedule a new priority takeover + Milliseconds halfElectionTimeout = config.getElectionTimeoutPeriod() / 2; + now = respondToHeartbeatsUntil( + config, timeZero + halfElectionTimeout * 3, primaryHostAndPort, currentOpTime); + + // Make sure that a new priority takeover has been scheduled and at the + // correct time. + ASSERT(replCoord->getPriorityTakeover_forTest()); + priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); + assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); // Now make us caught up enough to call for priority takeover to succeed. replCoord->setMyLastAppliedOpTime(closeEnoughOpTime); @@ -967,6 +1116,8 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecon HostAndPort primaryHostAndPort("node2", 12345); auto replCoord = getReplCoord(); + auto timeZero = getNet()->now(); + auto now = getNet()->now(); OperationContextNoop txn; OpTime currentOpTime(Timestamp(100, 0), 0); @@ -974,37 +1125,43 @@ TEST_F(PriorityTakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecon OpTime closeEnoughOpTime(Timestamp(98, 0), 0); replCoord->setMyLastAppliedOpTime(behindOpTime); replCoord->setMyLastDurableOpTime(behindOpTime); - ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); - - ASSERT_EQUALS(Date_t(), replCoord->getPriorityTakeover_forTest()); - auto now = getNet()->now(); + // Make sure we're secondary and that no priority takeover has been scheduled. + ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getPriorityTakeover_forTest()); - respondToAllHeartbeats(config, now, primaryHostAndPort, currentOpTime); - auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest(); - ASSERT_NOT_EQUALS(Date_t(), priorityTakeoverTime); - ASSERT_EQUALS(now + config.getPriorityTakeoverDelay(0), priorityTakeoverTime); + now = respondToHeartbeatsUntil(config, now, primaryHostAndPort, currentOpTime); + // Make sure that the priority takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getPriorityTakeover_forTest()); + auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); + assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); // At this point the other nodes are all ahead of the current node, so it can't call for // priority takeover. startCapturingLogMessages(); - respondToAllHeartbeats(config, priorityTakeoverTime, primaryHostAndPort, currentOpTime); + now = respondToHeartbeatsUntil(config, priorityTakeoverTime, primaryHostAndPort, currentOpTime); stopCapturingLogMessages(); - ASSERT(replCoord->getMemberState().secondary()); ASSERT_EQUALS(1, countLogLinesContaining("Not standing for election because member is not " "caught up enough to the most up-to-date member to " "call for priority takeover")); - now = getNet()->now(); - ASSERT_EQUALS(now, priorityTakeoverTime); - priorityTakeoverTime = replCoord->getPriorityTakeover_forTest(); - ASSERT_NOT_EQUALS(Date_t(), priorityTakeoverTime); - ASSERT_EQUALS(now + config.getPriorityTakeoverDelay(0), priorityTakeoverTime); + // Mock another round of heartbeat responses that occur after the previous + // 'priorityTakeoverTime', which should schedule a new priority takeover + Milliseconds halfElectionTimeout = config.getElectionTimeoutPeriod() / 2; + now = respondToHeartbeatsUntil( + config, timeZero + halfElectionTimeout * 3, primaryHostAndPort, currentOpTime); + + // Make sure that a new priority takeover has been scheduled and at the + // correct time. + ASSERT(replCoord->getPriorityTakeover_forTest()); + priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get(); + assertValidTakeoverDelay(config, now, priorityTakeoverTime, 0); // Now make us caught up enough to call for priority takeover to succeed. replCoord->setMyLastAppliedOpTime(closeEnoughOpTime); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 4c85e836bbb..72ff8ccb973 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -72,6 +72,20 @@ MONGO_FP_DECLARE(blockHeartbeatStepdown); using executor::RemoteCommandRequest; +Milliseconds ReplicationCoordinatorImpl::_getRandomizedElectionOffset() { + long long electionTimeout = durationCount<Milliseconds>(_rsConfig.getElectionTimeoutPeriod()); + long long randomOffsetUpperBound = + electionTimeout * _externalState->getElectionTimeoutOffsetLimitFraction(); + + // Avoid divide by zero error in random number generator. + if (randomOffsetUpperBound == 0) { + return Milliseconds(0); + } + + int64_t randomOffset = _replExecutor.nextRandomInt64(randomOffsetUpperBound); + return Milliseconds(randomOffset); +} + void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, const HostAndPort& target, int targetIndex) { @@ -276,9 +290,13 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction( } case HeartbeatResponseAction::PriorityTakeover: { stdx::unique_lock<stdx::mutex> lk(_mutex); + // Don't schedule a takeover if one is already scheduled. if (!_priorityTakeoverCbh.isValid()) { - _priorityTakeoverWhen = - _replExecutor.now() + _rsConfig.getPriorityTakeoverDelay(_selfIndex); + + // Add randomized offset to calculated priority takeover delay. + Milliseconds priorityTakeoverDelay = _rsConfig.getPriorityTakeoverDelay(_selfIndex); + Milliseconds randomOffset = _getRandomizedElectionOffset(); + _priorityTakeoverWhen = _replExecutor.now() + priorityTakeoverDelay + randomOffset; log() << "Scheduling priority takeover at " << _priorityTakeoverWhen; _priorityTakeoverCbh = _scheduleWorkAt( _priorityTakeoverWhen, @@ -803,9 +821,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() { return; } - Milliseconds randomOffset = Milliseconds(_replExecutor.nextRandomInt64( - durationCount<Milliseconds>(_rsConfig.getElectionTimeoutPeriod()) * - _externalState->getElectionTimeoutOffsetLimitFraction())); + Milliseconds randomOffset = _getRandomizedElectionOffset(); auto now = _replExecutor.now(); auto when = now + _rsConfig.getElectionTimeoutPeriod() + randomOffset; invariant(when > now); |