summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/repl_set_heartbeat_response.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp4
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp15
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp51
4 files changed, 73 insertions, 2 deletions
diff --git a/src/mongo/db/repl/repl_set_heartbeat_response.cpp b/src/mongo/db/repl/repl_set_heartbeat_response.cpp
index bb3d4918c02..8ccb6241950 100644
--- a/src/mongo/db/repl/repl_set_heartbeat_response.cpp
+++ b/src/mongo/db/repl/repl_set_heartbeat_response.cpp
@@ -204,6 +204,11 @@ Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc, long long term)
_isReplSet = doc[kIsReplSetFieldName].trueValue();
+ Status termStatus = bsonExtractIntegerField(doc, kTermFieldName, &_term);
+ if (!termStatus.isOK() && termStatus != ErrorCodes::NoSuchKey) {
+ return termStatus;
+ }
+
// In order to support both the 3.0(V0) and 3.2(V1) heartbeats we must parse the OpTime
// field based on its type. If it is a Date, we parse it as the timestamp and use
// initialize's term argument to complete the OpTime type. If it is an Object, then it's
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 1cb6b07c698..10822e0f625 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -236,6 +236,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (!_priorityTakeoverCbh.isValid()) {
auto when = _replExecutor.now() + _rsConfig.getPriorityTakeoverDelay(_selfIndex);
+ log() << "Scheduling priority takeover at " << when;
_priorityTakeoverCbh = _scheduleWorkAt(
when,
stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, this));
@@ -659,6 +660,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleLivenessUpdate_inlock(int u
void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() {
if (_priorityTakeoverCbh.isValid()) {
+ log() << "Canceling priority takeover callback";
_replExecutor.cancel(_priorityTakeoverCbh);
_priorityTakeoverCbh = CallbackHandle();
}
@@ -666,6 +668,7 @@ void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() {
void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() {
if (_handleElectionTimeoutCbh.isValid()) {
+ LOG(4) << "Canceling election timeout callback at " << _handleElectionTimeoutWhen;
_replExecutor.cancel(_handleElectionTimeoutCbh);
_handleElectionTimeoutCbh = CallbackHandle();
_handleElectionTimeoutWhen = Date_t();
@@ -693,6 +696,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() {
auto now = _replExecutor.now();
auto when = now + _rsConfig.getElectionTimeoutPeriod() + randomOffset;
invariant(when > now);
+ LOG(4) << "Scheduling election timeout callback at " << when;
_handleElectionTimeoutWhen = when;
_handleElectionTimeoutCbh = _scheduleWorkAt(
when, stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, this));
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 24a3dfef74f..f200dcd1f5d 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -1093,8 +1093,19 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1(
setMyHeartbeatMessage(now, "");
_currentPrimaryIndex = remotePrimaryIndex;
- if (_rsConfig.getMemberAt(remotePrimaryIndex).getPriority() <
- _rsConfig.getMemberAt(_selfIndex).getPriority()) {
+
+ // Priority takeover when the replset is stable.
+ //
+ // Take over the primary only if the remote primary is in the latest term I know.
+ // Otherwise, there must be an outstanding election, which may succeed or not, but
+ // the remote primary will become aware of that election eventually and step down.
+ if (_hbdata[remotePrimaryIndex].getTerm() == _term &&
+ _rsConfig.getMemberAt(remotePrimaryIndex).getPriority() <
+ _rsConfig.getMemberAt(_selfIndex).getPriority()) {
+ LOG(4) << "I can take over the primary due to higher priority."
+ << " Current primary index: " << remotePrimaryIndex << " in term "
+ << _hbdata[remotePrimaryIndex].getTerm();
+
return HeartbeatResponseAction::makePriorityTakeoverAction();
}
return HeartbeatResponseAction::makeNoAction();
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 57a0c520e36..6149dae1bb0 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -211,6 +211,7 @@ private:
hb.setState(memberState);
hb.setOpTime(lastOpTimeSender);
hb.setElectionTime(electionTime);
+ hb.setTerm(getTopoCoord().getTerm());
StatusWith<ReplSetHeartbeatResponse> hbResponse = responseStatus.isOK()
? StatusWith<ReplSetHeartbeatResponse>(hb)
@@ -2725,6 +2726,56 @@ TEST_F(HeartbeatResponseTestV1, UpdateHeartbeatDataPriorTakeoverDueToHigherPrior
ASSERT_EQUALS(1, getCurrentPrimaryIndex());
}
+TEST_F(HeartbeatResponseTestV1, UpdateHeartbeatDataTermPreventsPriorityTakeover) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 5 << "members" << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host0:27017"
+ << "priority" << 2)
+ << BSON("_id" << 1 << "host"
+ << "host1:27017"
+ << "priority" << 3)
+ << BSON("_id" << 2 << "host"
+ << "host2:27017"))
+ << "settings" << BSON("heartbeatTimeoutSecs" << 5)),
+ 0);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+
+ OpTime election = OpTime(Timestamp(400, 0), 0);
+ OpTime lastOpTimeApplied = OpTime(Timestamp(300, 0), 0);
+
+ ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
+
+ // Host 2 is the current primary in term 1.
+ getTopoCoord().updateTerm(1, now());
+ ASSERT_EQUALS(getTopoCoord().getTerm(), 1);
+ HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ election,
+ election,
+ lastOpTimeApplied);
+ ASSERT_EQUALS(HeartbeatResponseAction::PriorityTakeover, nextAction.getAction());
+ ASSERT_EQUALS(2, getCurrentPrimaryIndex());
+
+ now()++;
+ // Host 1 starts an election due to higher priority by sending vote requests.
+ // Vote request updates my term.
+ getTopoCoord().updateTerm(2, now());
+
+ // This heartbeat shouldn't schedule priority takeover, because the current primary
+ // host 1 is not in my term.
+ nextAction = receiveUpHeartbeat(HostAndPort("host1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ election,
+ lastOpTimeApplied);
+ ASSERT_EQUALS(HeartbeatResponseAction::NoAction, nextAction.getAction());
+ ASSERT_EQUALS(2, getCurrentPrimaryIndex());
+}
+
TEST_F(HeartbeatResponseTestV1, UpdateHeartbeatDataPrimaryDownMajorityOfVotersUp) {
updateConfig(BSON("_id"
<< "rs0"