summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2018-08-14 10:50:21 -0400
committerTess Avitabile <tess.avitabile@mongodb.com>2018-08-22 10:41:55 -0400
commitf5e7c8f3e81fe0cd34d4952ed2b547f3c29e06a4 (patch)
treef6db97b38a429e65abe042b367afcd79cb08ce1e /src/mongo/db
parent76b46c9e1080dc2f80091f06591eb4775d66a340 (diff)
downloadmongo-f5e7c8f3e81fe0cd34d4952ed2b547f3c29e06a4.tar.gz
SERVER-33248 Allow choosing a sync source that we are up to date with if it has a higher lastOpCommitted
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/member_data.cpp8
-rw-r--r--src/mongo/db/repl/member_data.h11
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp76
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp16
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp9
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp58
-rw-r--r--src/mongo/db/repl/topology_coordinator.h7
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp196
8 files changed, 299 insertions, 82 deletions
diff --git a/src/mongo/db/repl/member_data.cpp b/src/mongo/db/repl/member_data.cpp
index 9aed752d3d3..430e55424ce 100644
--- a/src/mongo/db/repl/member_data.cpp
+++ b/src/mongo/db/repl/member_data.cpp
@@ -45,7 +45,9 @@ MemberData::MemberData() : _health(-1), _authIssue(false), _configIndex(-1), _is
_lastResponse.setAppliedOpTime(OpTime());
}
-bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) {
+bool MemberData::setUpValues(Date_t now,
+ ReplSetHeartbeatResponse&& hbResponse,
+ OpTime lastOpCommitted) {
_health = 1;
if (_upSince == Date_t()) {
_upSince = now;
@@ -57,6 +59,10 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse)
_updatedSinceRestart = true;
_lastHeartbeatMessage.clear();
+ if (!lastOpCommitted.isNull()) {
+ _lastOpCommitted = lastOpCommitted;
+ }
+
if (!hbResponse.hasState()) {
hbResponse.setState(MemberState::RS_UNKNOWN);
}
diff --git a/src/mongo/db/repl/member_data.h b/src/mongo/db/repl/member_data.h
index 2598a34f30d..219a7e9f06b 100644
--- a/src/mongo/db/repl/member_data.h
+++ b/src/mongo/db/repl/member_data.h
@@ -74,6 +74,9 @@ public:
OpTime getHeartbeatDurableOpTime() const {
return _lastResponse.hasDurableOpTime() ? _lastResponse.getDurableOpTime() : OpTime();
}
+ OpTime getHeartbeatLastOpCommitted() const {
+ return _lastOpCommitted;
+ }
int getConfigVersion() const {
return _lastResponse.getConfigVersion();
}
@@ -135,9 +138,10 @@ public:
/**
* Sets values in this object from the results of a successful heartbeat command.
+ * 'lastOpCommitted' should be extracted from the heartbeat metadata.
* Returns whether or not the optimes advanced as a result of this heartbeat response.
*/
- bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse);
+ bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse, OpTime lastOpCommitted);
/**
* Sets values in this object from the results of a erroring/failed heartbeat command.
@@ -256,6 +260,11 @@ private:
// Last known OpTime that the replica has applied, whether journaled or unjournaled.
OpTime _lastAppliedOpTime;
+ // OpTime of the most recently committed op of which the node was aware, extracted from the
+ // heartbeat metadata. Note that only arbiters should update their knowledge of the commit point
+ // from heartbeat data.
+ OpTime _lastOpCommitted;
+
// TODO(russotto): Since memberData is kept in config order, _configIndex
// and _isSelf may not be necessary.
// Index of this member in the replica set configuration.
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 7a0ebb75d2e..e1e5a0beb63 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -110,8 +110,11 @@ BSONObj makeMetadataObject() {
* Checks the first batch of results from query.
* 'documents' are the first batch of results returned from tailing the remote oplog.
* 'lastFetched' optime and hash should be consistent with the predicate in the query.
+ * 'lastOpCommitted' is the OpTime of the most recently committed op of which this node is aware.
* 'remoteLastOpApplied' is the last OpTime applied on the sync source. This is optional for
* compatibility with 3.4 servers that do not send OplogQueryMetadata.
+ * 'remoteLastOpCommitted' is the OpTime of the most recently committed op of which the sync source
+ * is aware.
* 'requiredRBID' is a RollbackID received when we chose the sync source that we use here to
* guarantee we have not rolled back since we confirmed the sync source had our minValid.
* 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. This is optional
@@ -120,14 +123,17 @@ BSONObj makeMetadataObject() {
* oplog to be ahead of ours. If false, the sync source's oplog is allowed to be at the same point
* as ours, but still cannot be behind ours.
*
- * TODO (SERVER-27668): Make remoteLastOpApplied and remoteRBID non-optional in mongodb 3.8.
+ * TODO (SERVER-27668): Make remoteLastOpApplied, remoteLastOpCommitted, and remoteRBID
+ * non-optional.
*
* Returns OplogStartMissing if we cannot find the optime of the last fetched operation in
* the remote oplog.
*/
Status checkRemoteOplogStart(const Fetcher::Documents& documents,
OpTimeWithHash lastFetched,
+ OpTime lastOpCommitted,
boost::optional<OpTime> remoteLastOpApplied,
+ boost::optional<OpTime> remoteLastOpCommitted,
int requiredRBID,
boost::optional<int> remoteRBID,
bool requireFresherSyncSource) {
@@ -151,22 +157,11 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents,
}
}
- // The SyncSourceResolver never checks that the sync source candidate is actually ahead of
- // us. Rather than have it check there with an extra network roundtrip, we check here.
- if (requireFresherSyncSource && remoteLastOpApplied &&
- (*remoteLastOpApplied <= lastFetched.opTime)) {
- return Status(ErrorCodes::InvalidSyncSource,
- str::stream() << "Sync source's last applied OpTime "
- << remoteLastOpApplied->toString()
- << " is not greater than our last fetched OpTime "
- << lastFetched.opTime.toString()
- << ". Choosing new sync source.");
- } else if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) {
- // In initial sync, the lastFetched OpTime will almost always equal the remoteLastOpApplied
- // since we fetch the sync source's last applied OpTime to determine where to start our
- // OplogFetcher. This is fine since no other node can sync off of an initial syncing node
- // and thus cannot form a sync source cycle. To account for this, we must relax the
- // constraint on our sync source being fresher.
+ // The sync source could be behind us if it rolled back after we selected it. We could have
+ // failed to detect the rollback if it occurred between sync source selection (when we check the
+ // candidate is ahead of us) and sync source resolution (when we got 'requiredRBID'). If the
+ // sync source is now behind us, choose a new sync source to prevent going into rollback.
+ if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) {
return Status(ErrorCodes::InvalidSyncSource,
str::stream() << "Sync source's last applied OpTime "
<< remoteLastOpApplied->toString()
@@ -175,10 +170,36 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents,
<< ". Choosing new sync source.");
}
- // At this point we know that our sync source has our minValid and is ahead of us, so if our
+ // If 'requireFresherSyncSource' is true, we must check that the sync source's
+ // lastApplied/lastOpCommitted is ahead of us to prevent forming a cycle. Although we check for
+ // this condition in sync source selection, if an undetected rollback occurred between sync
+ // source selection and sync source resolution, this condition may no longer hold.
+ // 'requireFresherSyncSource' is false for initial sync, since no other node can sync off an
+ // initial syncing node, so we do not need to check for cycles. In addition, it would be
+ // problematic to check this condition for initial sync, since the 'lastFetched' OpTime will
+ // almost always equal the 'remoteLastApplied', since we fetch the sync source's last applied
+ // OpTime to determine where to start our OplogFetcher.
+ if (requireFresherSyncSource && remoteLastOpApplied && remoteLastOpCommitted &&
+ std::tie(*remoteLastOpApplied, *remoteLastOpCommitted) <=
+ std::tie(lastFetched.opTime, lastOpCommitted)) {
+ return Status(ErrorCodes::InvalidSyncSource,
+ str::stream()
+ << "Sync source cannot be behind me, and if I am up-to-date with the "
+ "sync source, it must have a higher lastOpCommitted. "
+ << "My last fetched oplog optime: "
+ << lastFetched.opTime.toString()
+ << ", latest oplog optime of sync source: "
+ << remoteLastOpApplied->toString()
+ << ", my lastOpCommitted: "
+ << lastOpCommitted.toString()
+ << ", lastOpCommitted of sync source: "
+ << remoteLastOpCommitted->toString());
+ }
+
+ // At this point we know that our sync source has our minValid and is not behind us, so if our
// history diverges from our sync source's we should prefer its history and roll back ours.
- // Since we checked for rollback and our sync source is ahead of us, an empty batch means that
+ // Since we checked for rollback and our sync source is not behind us, an empty batch means that
// we have a higher timestamp on our last fetched OpTime than our sync source's last applied
// OpTime, but a lower term. When this occurs, we must roll back our inconsistent oplog entry.
if (documents.empty()) {
@@ -420,12 +441,17 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none;
auto remoteLastApplied =
oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none;
- auto status = checkRemoteOplogStart(documents,
- lastFetched,
- remoteLastApplied,
- _requiredRBID,
- remoteRBID,
- _requireFresherSyncSource);
+ auto remoteLastOpCommitted =
+ oqMetadata ? boost::make_optional(oqMetadata->getLastOpCommitted()) : boost::none;
+ auto status = checkRemoteOplogStart(
+ documents,
+ lastFetched,
+ _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().opTime,
+ remoteLastApplied,
+ remoteLastOpCommitted,
+ _requiredRBID,
+ remoteRBID,
+ _requireFresherSyncSource);
if (!status.isOK()) {
// Stop oplog fetcher and execute rollback if necessary.
return status;
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 8f03c6c50cb..ef2985b4d20 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -371,6 +371,22 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead
}
TEST_F(OplogFetcherTest,
+ MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadButHasHigherLastOpCommitted) {
+ rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(remoteNewerOpTime, lastFetched.opTime, rbid, 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(replMetadata.writeToMetadata(&bob));
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+
+ auto entry = makeNoopOplogEntry(lastFetched);
+ auto shutdownState = processSingleBatch(
+ {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false);
+ ASSERT_OK(shutdownState->getStatus());
+ ASSERT(dataReplicatorExternalState->metadataWasProcessed);
+}
+
+TEST_F(OplogFetcherTest,
MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) {
rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 2b06d004d0a..59f982a129c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -144,6 +144,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
}
ReplSetHeartbeatResponse hbResponse;
+ OpTime lastOpCommitted;
BSONObj resp;
if (responseStatus.isOK()) {
resp = cbData.response.data;
@@ -172,9 +173,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
replMetadata = responseStatus;
}
if (replMetadata.isOK()) {
+ lastOpCommitted = replMetadata.getValue().getLastOpCommitted();
+
// Arbiters are the only nodes allowed to advance their commit point via heartbeats.
if (_getMemberState_inlock().arbiter()) {
- _advanceCommitPoint_inlock(replMetadata.getValue().getLastOpCommitted());
+ _advanceCommitPoint_inlock(lastOpCommitted);
}
// Asynchronous stepdown could happen, but it will wait for _mutex and execute
// after this function, so we cannot and don't need to wait for it to finish.
@@ -204,8 +207,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
hbStatusResponse = StatusWith<ReplSetHeartbeatResponse>(responseStatus);
}
- HeartbeatResponseAction action =
- _topCoord->processHeartbeatResponse(now, networkTime, target, hbStatusResponse);
+ HeartbeatResponseAction action = _topCoord->processHeartbeatResponse(
+ now, networkTime, target, hbStatusResponse, lastOpCommitted);
if (action.getAction() == HeartbeatResponseAction::NoAction && hbStatusResponse.isOK() &&
hbStatusResponse.getValue().hasState() &&
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 0bc6fb7fcbf..a298f5e0ecc 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -377,13 +377,21 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
continue;
}
}
- // only consider candidates that are ahead of where we are
- if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) {
- LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. "
- << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON()
- << ", latest oplog optime of sync candidate "
- << itMemberConfig.getHostAndPort() << ": "
- << it->getHeartbeatAppliedOpTime().toBSON();
+ // Do not select a candidate that is behind me. If I am up to date with the candidate,
+ // only select them if they have a higher lastOpCommitted.
+ if (std::tuple<OpTime, OpTime>(it->getHeartbeatAppliedOpTime(),
+ it->getHeartbeatLastOpCommitted()) <=
+ std::tie(lastOpTimeFetched, _lastCommittedOpTime)) {
+ LOG(1) << "Cannot select this sync source. Sync source cannot be behind me, and if "
+ "I am up-to-date with the sync source, it must have a higher "
+ "lastOpCommitted. "
+ << "Sync candidate: " << itMemberConfig.getHostAndPort()
+ << ", my last fetched oplog optime: " << lastOpTimeFetched.toBSON()
+ << ", latest oplog optime of sync candidate: "
+ << it->getHeartbeatAppliedOpTime().toBSON()
+ << ", my lastOpCommitted: " << _lastCommittedOpTime
+ << ", lastOpCommitted of sync candidate: "
+ << it->getHeartbeatLastOpCommitted();
continue;
}
// Candidate cannot be more latent than anything we've already considered.
@@ -684,7 +692,8 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
Date_t now,
Milliseconds networkRoundTripTime,
const HostAndPort& target,
- const StatusWith<ReplSetHeartbeatResponse>& hbResponse) {
+ const StatusWith<ReplSetHeartbeatResponse>& hbResponse,
+ OpTime lastOpCommitted) {
const MemberState originalState = getMemberState();
PingStats& hbStats = _pings[target];
invariant(hbStats.getLastHeartbeatStartDate() != Date_t());
@@ -794,7 +803,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
} else {
ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue());
LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId();
- advancedOpTime = hbData.setUpValues(now, std::move(hbr));
+ advancedOpTime = hbData.setUpValues(now, std::move(hbr), lastOpCommitted);
}
HeartbeatResponseAction nextAction;
@@ -1349,7 +1358,8 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex,
hbResponse.setSyncingTo(HostAndPort());
_memberData.at(primaryIndex)
.setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(),
- std::move(hbResponse));
+ std::move(hbResponse),
+ _memberData.at(primaryIndex).getHeartbeatLastOpCommitted());
}
_currentPrimaryIndex = primaryIndex;
}
@@ -1635,6 +1645,9 @@ void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) {
const auto heartbeatDurableOpTime = memberData.getHeartbeatDurableOpTime();
entry.append("heartbeatDurableOpTime", heartbeatDurableOpTime.toBSON());
+ const auto lastOpCommitted = memberData.getHeartbeatLastOpCommitted();
+ entry.append("heartbeatLastOpCommitted", lastOpCommitted.toBSON());
+
if (_selfIndex >= 0) {
const int memberId = memberData.getMemberId();
invariant(memberId >= 0);
@@ -2489,18 +2502,25 @@ bool TopologyCoordinator::shouldChangeSyncSource(
// If OplogQueryMetadata was provided, use its values, otherwise use the ones in
// ReplSetMetadata.
OpTime currentSourceOpTime;
+ OpTime currentSourceLastOpCommitted;
int syncSourceIndex = -1;
int primaryIndex = -1;
if (oqMetadata) {
currentSourceOpTime =
std::max(oqMetadata->getLastOpApplied(),
_memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
+ currentSourceLastOpCommitted =
+ std::max(oqMetadata->getLastOpCommitted(),
+ _memberData.at(currentSourceIndex).getHeartbeatLastOpCommitted());
syncSourceIndex = oqMetadata->getSyncSourceIndex();
primaryIndex = oqMetadata->getPrimaryIndex();
} else {
currentSourceOpTime =
std::max(replMetadata.getLastOpVisible(),
_memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
+ currentSourceLastOpCommitted =
+ std::max(replMetadata.getLastOpCommitted(),
+ _memberData.at(currentSourceIndex).getHeartbeatLastOpCommitted());
syncSourceIndex = replMetadata.getSyncSourceIndex();
primaryIndex = replMetadata.getPrimaryIndex();
}
@@ -2514,13 +2534,21 @@ bool TopologyCoordinator::shouldChangeSyncSource(
// Change sync source if they are not ahead of us, and don't have a sync source,
// unless they are primary.
const OpTime myLastOpTime = getMyLastAppliedOpTime();
- if (syncSourceIndex == -1 && currentSourceOpTime <= myLastOpTime &&
+ if (syncSourceIndex == -1 &&
+ std::tie(currentSourceOpTime, currentSourceLastOpCommitted) <=
+ std::tie(myLastOpTime, _lastCommittedOpTime) &&
primaryIndex != currentSourceIndex) {
std::stringstream logMessage;
- logMessage << "Choosing new sync source because our current sync source, "
- << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime
- << ") which is not ahead of ours (" << myLastOpTime
- << "), it does not have a sync source, and it's not the primary";
+
+ logMessage << "Choosing new sync source. Our current sync source is not primary and does "
+ "not have a sync source, so we require that it is not behind us, and that if "
+ "we are up-to-date with it, it has a higher lastOpCommitted. "
+ << "Current sync source: " << currentSource.toString()
+ << ", my last fetched oplog optime: " << myLastOpTime
+ << ", latest oplog optime of sync source: " << currentSourceOpTime
+ << ", my lastOpCommitted: " << _lastCommittedOpTime
+ << ", lastOpCommitted of sync source: " << currentSourceLastOpCommitted;
+
if (primaryIndex >= 0) {
logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)";
} else {
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 9416a4653a0..117bc784f75 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -391,8 +391,8 @@ public:
Date_t now, const std::string& ourSetName, const HostAndPort& target);
/**
- * Processes a heartbeat response from "target" that arrived around "now", having
- * spent "networkRoundTripTime" millis on the network.
+ * Processes a heartbeat response from "target" that arrived around "now" with "lastOpCommitted"
+ * in the metadata, having spent "networkRoundTripTime" millis on the network.
*
* Updates internal topology coordinator state, and returns instructions about what action
* to take next.
@@ -414,7 +414,8 @@ public:
Date_t now,
Milliseconds networkRoundTripTime,
const HostAndPort& target,
- const StatusWith<ReplSetHeartbeatResponse>& hbResponse);
+ const StatusWith<ReplSetHeartbeatResponse>& hbResponse,
+ OpTime lastOpCommitted);
/**
* Returns whether or not at least 'numNodes' have reached the given opTime.
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index 89a8af0d74e..be14cb543bb 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -169,9 +169,10 @@ protected:
// Only set visibleOpTime, primaryIndex and syncSourceIndex
ReplSetMetadata makeReplSetMetadata(OpTime visibleOpTime = OpTime(),
int primaryIndex = -1,
- int syncSourceIndex = -1) {
+ int syncSourceIndex = -1,
+ OpTime committedOpTime = OpTime()) {
return ReplSetMetadata(_topo->getTerm(),
- OpTime(),
+ committedOpTime,
visibleOpTime,
_currentConfig.getConfigVersion(),
OID(),
@@ -183,8 +184,10 @@ protected:
// Only set lastAppliedOpTime, primaryIndex and syncSourceIndex
OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(),
int primaryIndex = -1,
- int syncSourceIndex = -1) {
- return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, primaryIndex, syncSourceIndex);
+ int syncSourceIndex = -1,
+ OpTime committedOpTime = OpTime()) {
+ return OplogQueryMetadata(
+ committedOpTime, lastAppliedOpTime, -1, primaryIndex, syncSourceIndex);
}
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
@@ -192,13 +195,15 @@ protected:
MemberState memberState,
const OpTime& electionTime,
const OpTime& lastOpTimeSender,
- const HostAndPort& syncingTo = HostAndPort()) {
+ const HostAndPort& syncingTo = HostAndPort(),
+ const OpTime& lastOpCommittedSender = OpTime()) {
return _receiveHeartbeatHelper(Status::OK(),
member,
setName,
memberState,
electionTime.getTimestamp(),
lastOpTimeSender,
+ lastOpCommittedSender,
Milliseconds(1),
syncingTo);
}
@@ -216,6 +221,7 @@ protected:
MemberState::RS_UNKNOWN,
Timestamp(),
OpTime(),
+ OpTime(),
roundTripTime,
HostAndPort());
}
@@ -224,13 +230,15 @@ protected:
const std::string& setName,
MemberState memberState,
const OpTime& lastOpTimeSender,
- Milliseconds roundTripTime = Milliseconds(1)) {
+ Milliseconds roundTripTime = Milliseconds(1),
+ const OpTime& lastOpCommittedSender = OpTime()) {
return _receiveHeartbeatHelper(Status::OK(),
member,
setName,
memberState,
Timestamp(),
lastOpTimeSender,
+ lastOpCommittedSender,
roundTripTime,
HostAndPort());
}
@@ -242,6 +250,7 @@ private:
MemberState memberState,
Timestamp electionTime,
const OpTime& lastOpTimeSender,
+ const OpTime& lastOpCommittedSender,
Milliseconds roundTripTime,
const HostAndPort& syncingTo) {
ReplSetHeartbeatResponse hb;
@@ -259,7 +268,8 @@ private:
getTopoCoord().prepareHeartbeatRequestV1(now(), setName, member);
now() += roundTripTime;
- return getTopoCoord().processHeartbeatResponse(now(), roundTripTime, member, hbResponse);
+ return getTopoCoord().processHeartbeatResponse(
+ now(), roundTripTime, member, hbResponse, lastOpCommittedSender);
}
private:
@@ -574,6 +584,80 @@ TEST_F(TopoCoordTest, NodeWontChooseSyncSourceFromOlderTerm) {
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
}
+TEST_F(TopoCoordTest, NodeCanChooseSyncSourceWithSameLastAppliedAndHigherLastOpCommitted) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "hself")
+ << BSON("_id" << 10 << "host"
+ << "h1"))),
+ 0);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+ OpTime lastApplied = OpTime(Timestamp(100, 3), 3);
+ OpTime ourLastOpCommitted = OpTime(Timestamp(100, 1), 3);
+ OpTime lastOpCommittedSyncSource = OpTime(Timestamp(100, 2), 3);
+
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ lastApplied,
+ Milliseconds(100),
+ lastOpCommittedSyncSource);
+
+ // Record 2nd round of pings to allow choosing a new sync source.
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ lastApplied,
+ Milliseconds(100),
+ lastOpCommittedSyncSource);
+
+ ASSERT(getTopoCoord().advanceLastCommittedOpTime(ourLastOpCommitted));
+ getTopoCoord().chooseNewSyncSource(
+ now()++, lastApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress());
+}
+
+TEST_F(TopoCoordTest, NodeCannotChooseSyncSourceWithSameLastAppliedAndSameLastOpCommitted) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "hself")
+ << BSON("_id" << 10 << "host"
+ << "h1"))),
+ 0);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+ OpTime lastApplied = OpTime(Timestamp(100, 3), 3);
+ OpTime lastOpCommitted = OpTime(Timestamp(100, 1), 3);
+
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ lastApplied,
+ Milliseconds(100),
+ lastOpCommitted);
+
+ // Record 2nd round of pings to allow choosing a new sync source.
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ lastApplied,
+ Milliseconds(100),
+ lastOpCommitted);
+
+ ASSERT(getTopoCoord().advanceLastCommittedOpTime(lastOpCommitted));
+ getTopoCoord().chooseNewSyncSource(
+ now()++, lastApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ ASSERT(getTopoCoord().getSyncSourceAddress().empty());
+}
TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
updateConfig(BSON("_id"
@@ -1613,7 +1697,7 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
HostAndPort member = HostAndPort("test0:1234");
getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(1), setName, member);
getTopoCoord().processHeartbeatResponse(
- startupTime + Milliseconds(2), Milliseconds(1), member, hbResponseGood);
+ startupTime + Milliseconds(2), Milliseconds(1), member, hbResponseGood, OpTime());
getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(3), setName, member);
Date_t timeoutTime =
startupTime + Milliseconds(3) + ReplSetConfig::kDefaultHeartbeatTimeoutPeriod;
@@ -1622,12 +1706,12 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
StatusWith<ReplSetHeartbeatResponse>(Status(ErrorCodes::HostUnreachable, ""));
getTopoCoord().processHeartbeatResponse(
- timeoutTime, Milliseconds(5000), member, hbResponseDown);
+ timeoutTime, Milliseconds(5000), member, hbResponseDown, OpTime());
member = HostAndPort("test1:1234");
getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(2), setName, member);
getTopoCoord().processHeartbeatResponse(
- heartbeatTime, Milliseconds(4000), member, hbResponseGood);
+ heartbeatTime, Milliseconds(4000), member, hbResponseGood, OpTime());
makeSelfPrimary(electionTime);
getTopoCoord().setMyLastAppliedOpTime(oplogProgress, startupTime, false);
getTopoCoord().setMyLastDurableOpTime(oplogDurable, startupTime, false);
@@ -1814,7 +1898,7 @@ TEST_F(TopoCoordTest, HeartbeatFrequencyShouldBeHalfElectionTimeoutWhenArbiter)
std::pair<ReplSetHeartbeatArgsV1, Milliseconds> uppingRequest =
getTopoCoord().prepareHeartbeatRequestV1(requestDate, "myset", target);
auto action = getTopoCoord().processHeartbeatResponse(
- requestDate, Milliseconds(0), target, makeStatusWith<ReplSetHeartbeatResponse>());
+ requestDate, Milliseconds(0), target, makeStatusWith<ReplSetHeartbeatResponse>(), OpTime());
Date_t expected(now() + Milliseconds(2500));
ASSERT_EQUALS(expected, action.getNextHeartbeatStartDate());
}
@@ -3397,6 +3481,31 @@ TEST_F(HeartbeatResponseTestV1,
now()));
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"), makeReplSetMetadata(newerThanLastOpTimeApplied), boost::none, now()));
+
+ // If we are as up-to-date as this sync source, but it has a higher lastOpCommitted, we will not
+ // change sync sources.
+ OpTime lastOpCommitted = OpTime(Timestamp(100, 0), 0);
+ OpTime newerLastOpCommitted = OpTime(Timestamp(200, 0), 0);
+ ASSERT(getTopoCoord().advanceLastCommittedOpTime(lastOpCommitted));
+ nextAction = receiveUpHeartbeat(HostAndPort("host2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ lastOpTimeApplied,
+ HostAndPort(),
+ newerLastOpCommitted);
+ ASSERT_NO_ACTION(nextAction.getAction());
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied, -1, -1, newerLastOpCommitted),
+ now()));
+
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(lastOpTimeApplied, -1, -1, newerLastOpCommitted),
+ boost::none,
+ now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) {
@@ -3653,7 +3762,7 @@ TEST_F(HeartbeatResponseTestV1, ReconfigNodeRemovedBetweenHeartbeatRequestAndRep
hb.setElectionTime(election.getTimestamp());
StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb);
HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse(
- now()++, Milliseconds(0), HostAndPort("host3"), hbResponse);
+ now()++, Milliseconds(0), HostAndPort("host3"), hbResponse, OpTime());
// primary should not be set and we should perform NoAction in response
ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
@@ -3708,7 +3817,7 @@ TEST_F(HeartbeatResponseTestV1, ReconfigBetweenHeartbeatRequestAndRepsonse) {
StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb);
getTopoCoord().setMyLastAppliedOpTime(lastOpTimeApplied, Date_t(), false);
HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse(
- now()++, Milliseconds(0), HostAndPort("host3"), hbResponse);
+ now()++, Milliseconds(0), HostAndPort("host3"), hbResponse, OpTime());
// now primary should be host3, index 1, and we should perform NoAction in response
ASSERT_EQUALS(1, getCurrentPrimaryIndex());
@@ -3884,13 +3993,15 @@ TEST_F(TopoCoordTest, FreshestNodeDoesCatchupTakeover) {
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host3:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
hbResp.setAppliedOpTime(behindOptime);
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host2:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
getTopoCoord().updateTerm(1, Date_t());
ASSERT_OK(getTopoCoord().becomeCandidateIfElectable(
@@ -3937,13 +4048,15 @@ TEST_F(TopoCoordTest, StaleNodeDoesntDoCatchupTakeover) {
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host3:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
hbResp.setAppliedOpTime(behindOptime);
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host2:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
getTopoCoord().updateTerm(1, Date_t());
Status result = getTopoCoord().becomeCandidateIfElectable(
@@ -3993,12 +4106,14 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverHeartbeatSaysPrimaryCaughtUp) {
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host3:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host2:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
getTopoCoord().updateTerm(1, Date_t());
Status result = getTopoCoord().becomeCandidateIfElectable(
@@ -4051,13 +4166,15 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverIfTermNumbersSayPrimaryCaughtUp
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host3:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
hbResp.setAppliedOpTime(behindOptime);
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host2:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
getTopoCoord().updateTerm(1, Date_t());
Status result = getTopoCoord().becomeCandidateIfElectable(
@@ -5526,7 +5643,8 @@ TEST_F(HeartbeatResponseTestV1, NodeDoesNotRetryHeartbeatIfTheFirstFailureTakesT
// no retry allowed.
Milliseconds(4990), // Spent 4.99 of the 5 seconds in the network.
target,
- StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"));
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5626,8 +5744,12 @@ public:
Date_t _upRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T12:55Z"));
std::pair<ReplSetHeartbeatArgsV1, Milliseconds> uppingRequest =
getTopoCoord().prepareHeartbeatRequestV1(_upRequestDate, "rs0", _target);
- HeartbeatResponseAction upAction = getTopoCoord().processHeartbeatResponse(
- _upRequestDate, Milliseconds(0), _target, makeStatusWith<ReplSetHeartbeatResponse>());
+ HeartbeatResponseAction upAction =
+ getTopoCoord().processHeartbeatResponse(_upRequestDate,
+ Milliseconds(0),
+ _target,
+ makeStatusWith<ReplSetHeartbeatResponse>(),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, upAction.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5646,8 +5768,8 @@ public:
_firstRequestDate + Seconds(4), // 4 seconds elapsed, retry allowed.
Milliseconds(3990), // Spent 3.99 of the 4 seconds in the network.
_target,
- StatusWith<ReplSetHeartbeatResponse>(
- ErrorCodes::ExceededTimeLimit, "Took too long")); // We've never applied anything.
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"),
+ OpTime()); // We've never applied anything.
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5704,8 +5826,8 @@ TEST_F(HeartbeatResponseTestOneRetryV1,
// no retry allowed.
Milliseconds(1000), // Spent 1 of the 1.01 seconds in the network.
target(),
- StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit,
- "Took too long")); // We've never applied anything.
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"),
+ OpTime()); // We've never applied anything.
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5725,7 +5847,8 @@ public:
// could retry.
Milliseconds(400), // Spent 0.4 of the 0.5 seconds in the network.
target(),
- StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"));
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
// Because the first retry failed without timing out, we expect to retry immediately.
@@ -5772,7 +5895,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, NodeDoesNotRetryHeartbeatsAfterFailing
// could still retry.
Milliseconds(100), // Spent 0.1 of the 0.3 seconds in the network.
target(),
- StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"));
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
// Because this is the second retry, rather than retry again, we expect to wait for a quarter
@@ -5811,7 +5935,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures)
getTopoCoord().processHeartbeatResponse(firstRequestDate() + Milliseconds(4500),
Milliseconds(400),
target(),
- StatusWith<ReplSetHeartbeatResponse>(response));
+ StatusWith<ReplSetHeartbeatResponse>(response),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5828,7 +5953,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures)
firstRequestDate() + Milliseconds(7100),
Milliseconds(400),
target(),
- StatusWith<ReplSetHeartbeatResponse>(Status{ErrorCodes::HostUnreachable, ""}));
+ StatusWith<ReplSetHeartbeatResponse>(Status{ErrorCodes::HostUnreachable, ""}),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5904,7 +6030,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1, UpdateHeartbeatDataSameConfig) {
now()++, // Time is left.
Milliseconds(400), // Spent 0.4 of the 0.5 second in the network.
HostAndPort("host2"),
- StatusWith<ReplSetHeartbeatResponse>(sameConfigResponse));
+ StatusWith<ReplSetHeartbeatResponse>(sameConfigResponse),
+ OpTime());
stopCapturingLogMessages();
ASSERT_NO_ACTION(action.getAction());
ASSERT_EQUALS(1,
@@ -5929,7 +6056,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1,
now()++, // Time is left.
Milliseconds(400), // Spent 0.4 of the 0.5 second in the network.
HostAndPort("host5"),
- StatusWith<ReplSetHeartbeatResponse>(memberMissingResponse));
+ StatusWith<ReplSetHeartbeatResponse>(memberMissingResponse),
+ OpTime());
stopCapturingLogMessages();
ASSERT_NO_ACTION(action.getAction());
ASSERT_EQUALS(1, countLogLinesContaining("Could not find host5:27017 in current config"));