diff options
Diffstat (limited to 'src')
4 files changed, 73 insertions, 2 deletions
diff --git a/src/mongo/db/repl/repl_set_heartbeat_response.cpp b/src/mongo/db/repl/repl_set_heartbeat_response.cpp index bb3d4918c02..8ccb6241950 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_response.cpp +++ b/src/mongo/db/repl/repl_set_heartbeat_response.cpp @@ -204,6 +204,11 @@ Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc, long long term) _isReplSet = doc[kIsReplSetFieldName].trueValue(); + Status termStatus = bsonExtractIntegerField(doc, kTermFieldName, &_term); + if (!termStatus.isOK() && termStatus != ErrorCodes::NoSuchKey) { + return termStatus; + } + // In order to support both the 3.0(V0) and 3.2(V1) heartbeats we must parse the OpTime // field based on its type. If it is a Date, we parse it as the timestamp and use // initialize's term argument to complete the OpTime type. If it is an Object, then it's diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 1cb6b07c698..10822e0f625 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -236,6 +236,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction( stdx::unique_lock<stdx::mutex> lk(_mutex); if (!_priorityTakeoverCbh.isValid()) { auto when = _replExecutor.now() + _rsConfig.getPriorityTakeoverDelay(_selfIndex); + log() << "Scheduling priority takeover at " << when; _priorityTakeoverCbh = _scheduleWorkAt( when, stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, this)); @@ -659,6 +660,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleLivenessUpdate_inlock(int u void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() { if (_priorityTakeoverCbh.isValid()) { + log() << "Canceling priority takeover callback"; _replExecutor.cancel(_priorityTakeoverCbh); _priorityTakeoverCbh = CallbackHandle(); } @@ -666,6 +668,7 @@ void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() { void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() { if (_handleElectionTimeoutCbh.isValid()) { + LOG(4) << "Canceling election timeout callback at " << _handleElectionTimeoutWhen; _replExecutor.cancel(_handleElectionTimeoutCbh); _handleElectionTimeoutCbh = CallbackHandle(); _handleElectionTimeoutWhen = Date_t(); @@ -693,6 +696,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() { auto now = _replExecutor.now(); auto when = now + _rsConfig.getElectionTimeoutPeriod() + randomOffset; invariant(when > now); + LOG(4) << "Scheduling election timeout callback at " << when; _handleElectionTimeoutWhen = when; _handleElectionTimeoutCbh = _scheduleWorkAt( when, stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, this)); diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 24a3dfef74f..f200dcd1f5d 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -1093,8 +1093,19 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1( setMyHeartbeatMessage(now, ""); _currentPrimaryIndex = remotePrimaryIndex; - if (_rsConfig.getMemberAt(remotePrimaryIndex).getPriority() < - _rsConfig.getMemberAt(_selfIndex).getPriority()) { + + // Priority takeover when the replset is stable. + // + // Take over the primary only if the remote primary is in the latest term I know. + // Otherwise, there must be an outstanding election, which may succeed or not, but + // the remote primary will become aware of that election eventually and step down. + if (_hbdata[remotePrimaryIndex].getTerm() == _term && + _rsConfig.getMemberAt(remotePrimaryIndex).getPriority() < + _rsConfig.getMemberAt(_selfIndex).getPriority()) { + LOG(4) << "I can take over the primary due to higher priority." + << " Current primary index: " << remotePrimaryIndex << " in term " + << _hbdata[remotePrimaryIndex].getTerm(); + return HeartbeatResponseAction::makePriorityTakeoverAction(); } return HeartbeatResponseAction::makeNoAction(); diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp index 57a0c520e36..6149dae1bb0 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp @@ -211,6 +211,7 @@ private: hb.setState(memberState); hb.setOpTime(lastOpTimeSender); hb.setElectionTime(electionTime); + hb.setTerm(getTopoCoord().getTerm()); StatusWith<ReplSetHeartbeatResponse> hbResponse = responseStatus.isOK() ? StatusWith<ReplSetHeartbeatResponse>(hb) @@ -2725,6 +2726,56 @@ TEST_F(HeartbeatResponseTestV1, UpdateHeartbeatDataPriorTakeoverDueToHigherPrior ASSERT_EQUALS(1, getCurrentPrimaryIndex()); } +TEST_F(HeartbeatResponseTestV1, UpdateHeartbeatDataTermPreventsPriorityTakeover) { + updateConfig(BSON("_id" + << "rs0" + << "version" << 5 << "members" << BSON_ARRAY(BSON("_id" << 0 << "host" + << "host0:27017" + << "priority" << 2) + << BSON("_id" << 1 << "host" + << "host1:27017" + << "priority" << 3) + << BSON("_id" << 2 << "host" + << "host2:27017")) + << "settings" << BSON("heartbeatTimeoutSecs" << 5)), + 0); + + setSelfMemberState(MemberState::RS_SECONDARY); + + OpTime election = OpTime(Timestamp(400, 0), 0); + OpTime lastOpTimeApplied = OpTime(Timestamp(300, 0), 0); + + ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); + + // Host 2 is the current primary in term 1. + getTopoCoord().updateTerm(1, now()); + ASSERT_EQUALS(getTopoCoord().getTerm(), 1); + HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"), + "rs0", + MemberState::RS_PRIMARY, + election, + election, + lastOpTimeApplied); + ASSERT_EQUALS(HeartbeatResponseAction::PriorityTakeover, nextAction.getAction()); + ASSERT_EQUALS(2, getCurrentPrimaryIndex()); + + now()++; + // Host 1 starts an election due to higher priority by sending vote requests. + // Vote request updates my term. + getTopoCoord().updateTerm(2, now()); + + // This heartbeat shouldn't schedule priority takeover, because the current primary + // host 1 is not in my term. + nextAction = receiveUpHeartbeat(HostAndPort("host1"), + "rs0", + MemberState::RS_SECONDARY, + election, + election, + lastOpTimeApplied); + ASSERT_EQUALS(HeartbeatResponseAction::NoAction, nextAction.getAction()); + ASSERT_EQUALS(2, getCurrentPrimaryIndex()); +} + TEST_F(HeartbeatResponseTestV1, UpdateHeartbeatDataPrimaryDownMajorityOfVotersUp) { updateConfig(BSON("_id" << "rs0" |