summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2018-08-14 10:50:21 -0400
committerTess Avitabile <tess.avitabile@mongodb.com>2018-08-22 14:23:15 -0400
commit5df9e94b0c4840680d1d17fcf2f04412cf6d70cf (patch)
treecedc2c0f71cc0545ae9c524bce65f4c2ecfa5c4c
parentbce25d5e123843c0e594d6d743981099f236c983 (diff)
downloadmongo-5df9e94b0c4840680d1d17fcf2f04412cf6d70cf.tar.gz
SERVER-33248 Allow choosing a sync source that we are up to date with if it has a higher lastOpCommitted
(cherry picked from commit f5e7c8f3e81fe0cd34d4952ed2b547f3c29e06a4)
-rw-r--r--buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml4
-rw-r--r--buildscripts/resmokelib/testing/fixtures/replicaset.py18
-rw-r--r--jstests/libs/override_methods/check_uuids_consistent_across_cluster.js5
-rw-r--r--jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js12
-rw-r--r--jstests/replsets/dbhash_at_cluster_time.js11
-rw-r--r--jstests/replsets/libs/secondary_reads_test.js4
-rw-r--r--src/mongo/db/repl/member_data.cpp8
-rw-r--r--src/mongo/db/repl/member_data.h11
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp76
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp16
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp9
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp59
-rw-r--r--src/mongo/db/repl/topology_coordinator.h7
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp202
-rw-r--r--src/mongo/shell/replsettest.js19
17 files changed, 319 insertions, 150 deletions
diff --git a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough.yml
index 8eaa08c3f36..6ada79e5ca0 100644
--- a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough.yml
@@ -136,10 +136,6 @@ selector:
- jstests/core/explain_upsert.js
# The `dbstats` command builds in-memory structures that are not causally consistent.
- jstests/core/dbstats.js
- # TODO SERVER-33248: Remove this test from the blacklist. The secondary can get stuck without a
- # sync source if the RestartCatalog command kills its oplog fetching cursor, which blocks
- # secondary majority reads.
- - jstests/core/restart_catalog.js
exclude_with_any_tags:
- assumes_against_mongod_not_mongos
##
diff --git a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml
index bbe744b2286..1a94d618226 100644
--- a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml
+++ b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml
@@ -154,10 +154,6 @@ selector:
- jstests/core/explain_upsert.js
# The `dbstats` command builds in-memory structures that are not causally consistent.
- jstests/core/dbstats.js
- # TODO SERVER-33248: Remove this test from the blacklist. The secondary can get stuck without a
- # sync source if the RestartCatalog command kills its oplog fetching cursor, which blocks
- # secondary majority reads.
- - jstests/core/restart_catalog.js
# These include operations the root user auth'd on the test database is not authorized to perform,
# e.g. dropping or creating system collections.
diff --git a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml
index aa24ddbc2ef..b3ae37b1873 100644
--- a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml
@@ -176,10 +176,6 @@ selector:
- jstests/core/orf.js #explain.executionStats is not CC
# The `dbstats` command builds in-memory structures that are not causally consistent.
- jstests/core/dbstats.js
- # TODO SERVER-33248: Remove this test from the blacklist. The secondary can get stuck without a
- # sync source if the RestartCatalog command kills its oplog fetching cursor, which blocks
- # secondary majority reads.
- - jstests/core/restart_catalog.js
exclude_with_any_tags:
# Tests tagged with the following will fail because they assume collections are not sharded.
- assumes_no_implicit_collection_creation_after_drop
diff --git a/buildscripts/resmokelib/testing/fixtures/replicaset.py b/buildscripts/resmokelib/testing/fixtures/replicaset.py
index 437037e88b9..b1966e4ed59 100644
--- a/buildscripts/resmokelib/testing/fixtures/replicaset.py
+++ b/buildscripts/resmokelib/testing/fixtures/replicaset.py
@@ -272,21 +272,13 @@ class ReplicaSetFixture(interface.ReplFixture): # pylint: disable=too-many-inst
primary_client = self.nodes[0].mongo_client()
self.auth(primary_client, self.auth_options)
- # Algorithm precondition: All nodes must be in primary/secondary state.
- #
- # 1) Perform a majority write. This will guarantee the primary updates its commit point
- # to the value of this write.
- #
- # 2) Perform a second write. This will guarantee that all nodes will update their commit
- # point to a time that is >= the previous write. That will trigger a stable checkpoint
- # on all nodes.
- # TODO(SERVER-33248): Remove this block. We should not need to prod the replica set to
- # advance the commit point if the commit point being lagged is sufficient to choose a
- # sync source.
+ # All nodes must be in primary/secondary state prior to this point. Perform a majority
+ # write to ensure there is a committed operation on the set. The commit point will
+ # propagate to all members and trigger a stable checkpoint on all persisted storage engines
+ # nodes.
admin = primary_client.get_database(
"admin", write_concern=pymongo.write_concern.WriteConcern(w="majority"))
- admin.command("appendOplogNote", data={"await_stable_checkpoint": 1})
- admin.command("appendOplogNote", data={"await_stable_checkpoint": 2})
+ admin.command("appendOplogNote", data={"await_stable_recovery_timestamp": 1})
for node in self.nodes:
self.logger.info("Waiting for node on port %d to have a stable checkpoint.", node.port)
diff --git a/jstests/libs/override_methods/check_uuids_consistent_across_cluster.js b/jstests/libs/override_methods/check_uuids_consistent_across_cluster.js
index 0a05e19b25c..6412b894ebd 100644
--- a/jstests/libs/override_methods/check_uuids_consistent_across_cluster.js
+++ b/jstests/libs/override_methods/check_uuids_consistent_across_cluster.js
@@ -47,11 +47,6 @@ ShardingTest.prototype.checkUUIDsConsistentAcrossCluster = function() {
continue;
}
var rs = this._rs[i].test;
- // The noop writer needs to be enabled in case a sync source isn't set, so that
- // awaitLastOpCommitted() is guaranteed to finish.
- // SERVER-33248 for reference.
- rs.getPrimary().adminCommand({setParameter: 1, periodicNoopIntervalSecs: 1});
- rs.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true});
var keyFile = this._otherParams.keyFile;
if (keyFile) {
authutil.asCluster(rs.nodes, keyFile, function() {
diff --git a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js
index 6c650a2cded..85206f15650 100644
--- a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js
+++ b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js
@@ -29,20 +29,10 @@
// Create the collection and insert one document. Get the op time of the write.
let res = assert.commandWorked(primaryDB.runCommand(
{insert: collName, documents: [{_id: "before"}], writeConcern: {w: "majority"}}));
- let clusterTimePrimaryBefore;
+ const clusterTimePrimaryBefore = res.opTime.ts;
// Wait for the majority commit point on 'secondaryDB0' to include the {_id: "before"} write.
assert.soonNoExcept(function() {
- // Without a consistent stream of writes, secondary majority reads are not guaranteed
- // to complete, since the commit point being stale is not sufficient to establish a sync
- // source.
- // TODO (SERVER-33248): Remove this write and increase the maxTimeMS on the read.
- res = assert.commandWorked(primaryDB.runCommand(
- {insert: "otherColl", documents: [{a: 1}], writeConcern: {w: "majority"}}));
- assert(res.hasOwnProperty("opTime"), tojson(res));
- assert(res.opTime.hasOwnProperty("ts"), tojson(res));
- clusterTimePrimaryBefore = res.opTime.ts;
-
return assert
.commandWorked(secondaryDB0.runCommand(
{find: collName, readConcern: {level: "majority"}, maxTimeMS: 10000}))
diff --git a/jstests/replsets/dbhash_at_cluster_time.js b/jstests/replsets/dbhash_at_cluster_time.js
index 27f4e0f79db..8c52ab741db 100644
--- a/jstests/replsets/dbhash_at_cluster_time.js
+++ b/jstests/replsets/dbhash_at_cluster_time.js
@@ -19,17 +19,6 @@
const db = session.getDatabase("test");
let txnNumber = 0;
- // We force 'secondary' to sync from 'primary' using the "forceSyncSourceCandidate" failpoint to
- // ensure that an intermittent connectivity issue doesn't lead to the secondary not advancing
- // its belief of the majority commit point. This avoids any complications that would arise due
- // to SERVER-33248.
- assert.commandWorked(secondary.adminCommand({
- configureFailPoint: "forceSyncSourceCandidate",
- mode: "alwaysOn",
- data: {hostAndPort: primary.host}
- }));
- rst.awaitSyncSource(secondary, primary);
-
// We also prevent all nodes in the replica set from advancing oldest_timestamp. This ensures
// that the snapshot associated with 'clusterTime' is retained for the duration of this test.
rst.nodes.forEach(conn => {
diff --git a/jstests/replsets/libs/secondary_reads_test.js b/jstests/replsets/libs/secondary_reads_test.js
index 2ca1117b6ed..da22f9b73b5 100644
--- a/jstests/replsets/libs/secondary_reads_test.js
+++ b/jstests/replsets/libs/secondary_reads_test.js
@@ -24,9 +24,7 @@ function SecondaryReadsTest(name = "secondary_reads_test") {
* two-node replica set running with the latest version.
*/
function performStandardSetup() {
- // TODO: Periodic noop writes can be removed once SERVER-33248 is complete.
- let replSet = new ReplSetTest(
- {name, nodes: 2, nodeOptions: {setParameter: {writePeriodicNoops: true}}});
+ let replSet = new ReplSetTest({name, nodes: 2});
replSet.startSet();
const nodes = replSet.nodeList();
diff --git a/src/mongo/db/repl/member_data.cpp b/src/mongo/db/repl/member_data.cpp
index 0ea383532b0..5cf86a04b21 100644
--- a/src/mongo/db/repl/member_data.cpp
+++ b/src/mongo/db/repl/member_data.cpp
@@ -45,7 +45,9 @@ MemberData::MemberData() : _health(-1), _authIssue(false), _configIndex(-1), _is
_lastResponse.setAppliedOpTime(OpTime());
}
-bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) {
+bool MemberData::setUpValues(Date_t now,
+ ReplSetHeartbeatResponse&& hbResponse,
+ OpTime lastOpCommitted) {
_health = 1;
if (_upSince == Date_t()) {
_upSince = now;
@@ -56,6 +58,10 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse)
_lastUpdateStale = false;
_updatedSinceRestart = true;
+ if (!lastOpCommitted.isNull()) {
+ _lastOpCommitted = lastOpCommitted;
+ }
+
if (!hbResponse.hasState()) {
hbResponse.setState(MemberState::RS_UNKNOWN);
}
diff --git a/src/mongo/db/repl/member_data.h b/src/mongo/db/repl/member_data.h
index ab484a64469..fb6e46443ca 100644
--- a/src/mongo/db/repl/member_data.h
+++ b/src/mongo/db/repl/member_data.h
@@ -74,6 +74,9 @@ public:
OpTime getHeartbeatDurableOpTime() const {
return _lastResponse.hasDurableOpTime() ? _lastResponse.getDurableOpTime() : OpTime();
}
+ OpTime getHeartbeatLastOpCommitted() const {
+ return _lastOpCommitted;
+ }
int getConfigVersion() const {
return _lastResponse.getConfigVersion();
}
@@ -141,9 +144,10 @@ public:
/**
* Sets values in this object from the results of a successful heartbeat command.
+ * 'lastOpCommitted' should be extracted from the heartbeat metadata.
* Returns whether or not the optimes advanced as a result of this heartbeat response.
*/
- bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse);
+ bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse, OpTime lastOpCommitted);
/**
* Sets values in this object from the results of a erroring/failed heartbeat command.
@@ -259,6 +263,11 @@ private:
// Last known OpTime that the replica has applied, whether journaled or unjournaled.
OpTime _lastAppliedOpTime;
+ // OpTime of the most recently committed op of which the node was aware, extracted from the
+ // heartbeat metadata. Note that only arbiters should update their knowledge of the commit point
+ // from heartbeat data.
+ OpTime _lastOpCommitted;
+
// TODO(russotto): Since memberData is kept in config order, _configIndex
// and _isSelf may not be necessary.
// Index of this member in the replica set configuration.
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 28af2d5a834..df1f58e83a8 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -117,8 +117,11 @@ BSONObj makeMetadataObject(bool isV1ElectionProtocol) {
* Checks the first batch of results from query.
* 'documents' are the first batch of results returned from tailing the remote oplog.
* 'lastFetched' optime and hash should be consistent with the predicate in the query.
+ * 'lastOpCommitted' is the OpTime of the most recently committed op of which this node is aware.
* 'remoteLastOpApplied' is the last OpTime applied on the sync source. This is optional for
* compatibility with 3.4 servers that do not send OplogQueryMetadata.
+ * 'remoteLastOpCommitted' is the OpTime of the most recently committed op of which the sync source
+ * is aware.
* 'requiredRBID' is a RollbackID received when we chose the sync source that we use here to
* guarantee we have not rolled back since we confirmed the sync source had our minValid.
* 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. This is optional
@@ -127,14 +130,17 @@ BSONObj makeMetadataObject(bool isV1ElectionProtocol) {
* oplog to be ahead of ours. If false, the sync source's oplog is allowed to be at the same point
* as ours, but still cannot be behind ours.
*
- * TODO (SERVER-27668): Make remoteLastOpApplied and remoteRBID non-optional in mongodb 3.8.
+ * TODO (SERVER-27668): Make remoteLastOpApplied, remoteLastOpCommitted, and remoteRBID
+ * non-optional.
*
* Returns OplogStartMissing if we cannot find the optime of the last fetched operation in
* the remote oplog.
*/
Status checkRemoteOplogStart(const Fetcher::Documents& documents,
OpTimeWithHash lastFetched,
+ OpTime lastOpCommitted,
boost::optional<OpTime> remoteLastOpApplied,
+ boost::optional<OpTime> remoteLastOpCommitted,
int requiredRBID,
boost::optional<int> remoteRBID,
bool requireFresherSyncSource) {
@@ -158,22 +164,11 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents,
}
}
- // The SyncSourceResolver never checks that the sync source candidate is actually ahead of
- // us. Rather than have it check there with an extra network roundtrip, we check here.
- if (requireFresherSyncSource && remoteLastOpApplied &&
- (*remoteLastOpApplied <= lastFetched.opTime)) {
- return Status(ErrorCodes::InvalidSyncSource,
- str::stream() << "Sync source's last applied OpTime "
- << remoteLastOpApplied->toString()
- << " is not greater than our last fetched OpTime "
- << lastFetched.opTime.toString()
- << ". Choosing new sync source.");
- } else if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) {
- // In initial sync, the lastFetched OpTime will almost always equal the remoteLastOpApplied
- // since we fetch the sync source's last applied OpTime to determine where to start our
- // OplogFetcher. This is fine since no other node can sync off of an initial syncing node
- // and thus cannot form a sync source cycle. To account for this, we must relax the
- // constraint on our sync source being fresher.
+ // The sync source could be behind us if it rolled back after we selected it. We could have
+ // failed to detect the rollback if it occurred between sync source selection (when we check the
+ // candidate is ahead of us) and sync source resolution (when we got 'requiredRBID'). If the
+ // sync source is now behind us, choose a new sync source to prevent going into rollback.
+ if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) {
return Status(ErrorCodes::InvalidSyncSource,
str::stream() << "Sync source's last applied OpTime "
<< remoteLastOpApplied->toString()
@@ -182,10 +177,36 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents,
<< ". Choosing new sync source.");
}
- // At this point we know that our sync source has our minValid and is ahead of us, so if our
+ // If 'requireFresherSyncSource' is true, we must check that the sync source's
+ // lastApplied/lastOpCommitted is ahead of us to prevent forming a cycle. Although we check for
+ // this condition in sync source selection, if an undetected rollback occurred between sync
+ // source selection and sync source resolution, this condition may no longer hold.
+ // 'requireFresherSyncSource' is false for initial sync, since no other node can sync off an
+ // initial syncing node, so we do not need to check for cycles. In addition, it would be
+ // problematic to check this condition for initial sync, since the 'lastFetched' OpTime will
+ // almost always equal the 'remoteLastApplied', since we fetch the sync source's last applied
+ // OpTime to determine where to start our OplogFetcher.
+ if (requireFresherSyncSource && remoteLastOpApplied && remoteLastOpCommitted &&
+ std::tie(*remoteLastOpApplied, *remoteLastOpCommitted) <=
+ std::tie(lastFetched.opTime, lastOpCommitted)) {
+ return Status(ErrorCodes::InvalidSyncSource,
+ str::stream()
+ << "Sync source cannot be behind me, and if I am up-to-date with the "
+ "sync source, it must have a higher lastOpCommitted. "
+ << "My last fetched oplog optime: "
+ << lastFetched.opTime.toString()
+ << ", latest oplog optime of sync source: "
+ << remoteLastOpApplied->toString()
+ << ", my lastOpCommitted: "
+ << lastOpCommitted.toString()
+ << ", lastOpCommitted of sync source: "
+ << remoteLastOpCommitted->toString());
+ }
+
+ // At this point we know that our sync source has our minValid and is not behind us, so if our
// history diverges from our sync source's we should prefer its history and roll back ours.
- // Since we checked for rollback and our sync source is ahead of us, an empty batch means that
+ // Since we checked for rollback and our sync source is not behind us, an empty batch means that
// we have a higher timestamp on our last fetched OpTime than our sync source's last applied
// OpTime, but a lower term. When this occurs, we must roll back our inconsistent oplog entry.
if (documents.empty()) {
@@ -427,12 +448,17 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none;
auto remoteLastApplied =
oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none;
- auto status = checkRemoteOplogStart(documents,
- lastFetched,
- remoteLastApplied,
- _requiredRBID,
- remoteRBID,
- _requireFresherSyncSource);
+ auto remoteLastOpCommitted =
+ oqMetadata ? boost::make_optional(oqMetadata->getLastOpCommitted()) : boost::none;
+ auto status = checkRemoteOplogStart(
+ documents,
+ lastFetched,
+ _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().opTime,
+ remoteLastApplied,
+ remoteLastOpCommitted,
+ _requiredRBID,
+ remoteRBID,
+ _requireFresherSyncSource);
if (!status.isOK()) {
// Stop oplog fetcher and execute rollback if necessary.
return status;
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 5209ea0c4e3..9e0fc7378c1 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -359,6 +359,22 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead
}
TEST_F(OplogFetcherTest,
+ MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadButHasHigherLastOpCommitted) {
+ rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(remoteNewerOpTime, lastFetched.opTime, rbid, 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(replMetadata.writeToMetadata(&bob));
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+
+ auto entry = makeNoopOplogEntry(lastFetched);
+ auto shutdownState =
+ processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false);
+ ASSERT_OK(shutdownState->getStatus());
+ ASSERT(dataReplicatorExternalState->metadataWasProcessed);
+}
+
+TEST_F(OplogFetcherTest,
MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) {
rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 3a1a9cba884..64c06d47182 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -152,6 +152,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
}
ReplSetHeartbeatResponse hbResponse;
+ OpTime lastOpCommitted;
BSONObj resp;
if (responseStatus.isOK()) {
resp = cbData.response.data;
@@ -180,9 +181,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
replMetadata = responseStatus;
}
if (replMetadata.isOK()) {
+ lastOpCommitted = replMetadata.getValue().getLastOpCommitted();
+
// Arbiters are the only nodes allowed to advance their commit point via heartbeats.
if (_getMemberState_inlock().arbiter()) {
- _advanceCommitPoint_inlock(replMetadata.getValue().getLastOpCommitted());
+ _advanceCommitPoint_inlock(lastOpCommitted);
}
// Asynchronous stepdown could happen, but it will wait for _mutex and execute
// after this function, so we cannot and don't need to wait for it to finish.
@@ -211,8 +214,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
hbStatusResponse = StatusWith<ReplSetHeartbeatResponse>(responseStatus);
}
- HeartbeatResponseAction action =
- _topCoord->processHeartbeatResponse(now, networkTime, target, hbStatusResponse);
+ HeartbeatResponseAction action = _topCoord->processHeartbeatResponse(
+ now, networkTime, target, hbStatusResponse, lastOpCommitted);
if (action.getAction() == HeartbeatResponseAction::NoAction && hbStatusResponse.isOK() &&
hbStatusResponse.getValue().hasState() &&
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 07020fc1fb6..f5e7fdfbe5d 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -385,13 +385,21 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
continue;
}
}
- // only consider candidates that are ahead of where we are
- if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) {
- LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. "
- << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON()
- << ", latest oplog optime of sync candidate "
- << itMemberConfig.getHostAndPort() << ": "
- << it->getHeartbeatAppliedOpTime().toBSON();
+ // Do not select a candidate that is behind me. If I am up to date with the candidate,
+ // only select them if they have a higher lastOpCommitted.
+ if (std::tuple<OpTime, OpTime>(it->getHeartbeatAppliedOpTime(),
+ it->getHeartbeatLastOpCommitted()) <=
+ std::tie(lastOpTimeFetched, _lastCommittedOpTime)) {
+ LOG(1) << "Cannot select this sync source. Sync source cannot be behind me, and if "
+ "I am up-to-date with the sync source, it must have a higher "
+ "lastOpCommitted. "
+ << "Sync candidate: " << itMemberConfig.getHostAndPort()
+ << ", my last fetched oplog optime: " << lastOpTimeFetched.toBSON()
+ << ", latest oplog optime of sync candidate: "
+ << it->getHeartbeatAppliedOpTime().toBSON()
+ << ", my lastOpCommitted: " << _lastCommittedOpTime
+ << ", lastOpCommitted of sync candidate: "
+ << it->getHeartbeatLastOpCommitted();
continue;
}
// Candidate cannot be more latent than anything we've already considered.
@@ -1025,7 +1033,8 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
Date_t now,
Milliseconds networkRoundTripTime,
const HostAndPort& target,
- const StatusWith<ReplSetHeartbeatResponse>& hbResponse) {
+ const StatusWith<ReplSetHeartbeatResponse>& hbResponse,
+ OpTime lastOpCommitted) {
const MemberState originalState = getMemberState();
PingStats& hbStats = _pings[target];
invariant(hbStats.getLastHeartbeatStartDate() != Date_t());
@@ -1146,7 +1155,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue());
LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId()
<< ", msg: " << hbr.getHbMsg();
- advancedOpTime = hbData.setUpValues(now, std::move(hbr));
+ advancedOpTime = hbData.setUpValues(now, std::move(hbr), lastOpCommitted);
}
HeartbeatResponseAction nextAction;
@@ -1964,7 +1973,8 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex,
hbResponse.setHbMsg("");
_memberData.at(primaryIndex)
.setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(),
- std::move(hbResponse));
+ std::move(hbResponse),
+ _memberData.at(primaryIndex).getHeartbeatLastOpCommitted());
}
_currentPrimaryIndex = primaryIndex;
}
@@ -2231,6 +2241,10 @@ void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) {
entry.append("optime", lastDurableOpTime.getTimestamp());
}
entry.append("host", memberData.getHostAndPort().toString());
+
+ const auto lastOpCommitted = memberData.getHeartbeatLastOpCommitted();
+ entry.append("heartbeatLastOpCommitted", lastOpCommitted.toBSON());
+
if (_selfIndex >= 0) {
const int memberId = memberData.getMemberId();
invariant(memberId >= 0);
@@ -3198,18 +3212,25 @@ bool TopologyCoordinator::shouldChangeSyncSource(
// If OplogQueryMetadata was provided, use its values, otherwise use the ones in
// ReplSetMetadata.
OpTime currentSourceOpTime;
+ OpTime currentSourceLastOpCommitted;
int syncSourceIndex = -1;
int primaryIndex = -1;
if (oqMetadata) {
currentSourceOpTime =
std::max(oqMetadata->getLastOpApplied(),
_memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
+ currentSourceLastOpCommitted =
+ std::max(oqMetadata->getLastOpCommitted(),
+ _memberData.at(currentSourceIndex).getHeartbeatLastOpCommitted());
syncSourceIndex = oqMetadata->getSyncSourceIndex();
primaryIndex = oqMetadata->getPrimaryIndex();
} else {
currentSourceOpTime =
std::max(replMetadata.getLastOpVisible(),
_memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
+ currentSourceLastOpCommitted =
+ std::max(replMetadata.getLastOpCommitted(),
+ _memberData.at(currentSourceIndex).getHeartbeatLastOpCommitted());
syncSourceIndex = replMetadata.getSyncSourceIndex();
primaryIndex = replMetadata.getPrimaryIndex();
}
@@ -3224,12 +3245,20 @@ bool TopologyCoordinator::shouldChangeSyncSource(
// unless they are primary.
const OpTime myLastOpTime = getMyLastAppliedOpTime();
if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 &&
- currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) {
+ std::tie(currentSourceOpTime, currentSourceLastOpCommitted) <=
+ std::tie(myLastOpTime, _lastCommittedOpTime) &&
+ primaryIndex != currentSourceIndex) {
std::stringstream logMessage;
- logMessage << "Choosing new sync source because our current sync source, "
- << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime
- << ") which is not ahead of ours (" << myLastOpTime
- << "), it does not have a sync source, and it's not the primary";
+
+ logMessage << "Choosing new sync source. Our current sync source is not primary and does "
+ "not have a sync source, so we require that it is not behind us, and that if "
+ "we are up-to-date with it, it has a higher lastOpCommitted. "
+ << "Current sync source: " << currentSource.toString()
+ << ", my last fetched oplog optime: " << myLastOpTime
+ << ", latest oplog optime of sync source: " << currentSourceOpTime
+ << ", my lastOpCommitted: " << _lastCommittedOpTime
+ << ", lastOpCommitted of sync source: " << currentSourceLastOpCommitted;
+
if (primaryIndex >= 0) {
logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)";
} else {
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index e7052b79d68..2de3df6d95c 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -424,8 +424,8 @@ public:
Date_t now, const std::string& ourSetName, const HostAndPort& target);
/**
- * Processes a heartbeat response from "target" that arrived around "now", having
- * spent "networkRoundTripTime" millis on the network.
+ * Processes a heartbeat response from "target" that arrived around "now" with "lastOpCommitted"
+ * in the metadata, having spent "networkRoundTripTime" millis on the network.
*
* Updates internal topology coordinator state, and returns instructions about what action
* to take next.
@@ -454,7 +454,8 @@ public:
Date_t now,
Milliseconds networkRoundTripTime,
const HostAndPort& target,
- const StatusWith<ReplSetHeartbeatResponse>& hbResponse);
+ const StatusWith<ReplSetHeartbeatResponse>& hbResponse,
+ OpTime lastOpCommitted);
/**
* Returns whether or not at least 'numNodes' have reached the given opTime.
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index 29925c68e9b..4ca986b47f7 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -170,9 +170,10 @@ protected:
// Only set visibleOpTime, primaryIndex and syncSourceIndex
ReplSetMetadata makeReplSetMetadata(OpTime visibleOpTime = OpTime(),
int primaryIndex = -1,
- int syncSourceIndex = -1) {
+ int syncSourceIndex = -1,
+ OpTime committedOpTime = OpTime()) {
return ReplSetMetadata(_topo->getTerm(),
- OpTime(),
+ committedOpTime,
visibleOpTime,
_currentConfig.getConfigVersion(),
OID(),
@@ -184,8 +185,10 @@ protected:
// Only set lastAppliedOpTime, primaryIndex and syncSourceIndex
OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(),
int primaryIndex = -1,
- int syncSourceIndex = -1) {
- return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, primaryIndex, syncSourceIndex);
+ int syncSourceIndex = -1,
+ OpTime committedOpTime = OpTime()) {
+ return OplogQueryMetadata(
+ committedOpTime, lastAppliedOpTime, -1, primaryIndex, syncSourceIndex);
}
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
@@ -193,13 +196,15 @@ protected:
MemberState memberState,
const OpTime& electionTime,
const OpTime& lastOpTimeSender,
- const HostAndPort& syncingTo = HostAndPort()) {
+ const HostAndPort& syncingTo = HostAndPort(),
+ const OpTime& lastOpCommittedSender = OpTime()) {
return _receiveHeartbeatHelper(Status::OK(),
member,
setName,
memberState,
electionTime.getTimestamp(),
lastOpTimeSender,
+ lastOpCommittedSender,
Milliseconds(1),
syncingTo);
}
@@ -217,6 +222,7 @@ protected:
MemberState::RS_UNKNOWN,
Timestamp(),
OpTime(),
+ OpTime(),
roundTripTime,
HostAndPort());
}
@@ -225,13 +231,15 @@ protected:
const std::string& setName,
MemberState memberState,
const OpTime& lastOpTimeSender,
- Milliseconds roundTripTime = Milliseconds(1)) {
+ Milliseconds roundTripTime = Milliseconds(1),
+ const OpTime& lastOpCommittedSender = OpTime()) {
return _receiveHeartbeatHelper(Status::OK(),
member,
setName,
memberState,
Timestamp(),
lastOpTimeSender,
+ lastOpCommittedSender,
roundTripTime,
HostAndPort());
}
@@ -243,6 +251,7 @@ private:
MemberState memberState,
Timestamp electionTime,
const OpTime& lastOpTimeSender,
+ const OpTime& lastOpCommittedSender,
Milliseconds roundTripTime,
const HostAndPort& syncingTo) {
ReplSetHeartbeatResponse hb;
@@ -260,7 +269,8 @@ private:
getTopoCoord().prepareHeartbeatRequestV1(now(), setName, member);
now() += roundTripTime;
- return getTopoCoord().processHeartbeatResponse(now(), roundTripTime, member, hbResponse);
+ return getTopoCoord().processHeartbeatResponse(
+ now(), roundTripTime, member, hbResponse, lastOpCommittedSender);
}
private:
@@ -575,6 +585,80 @@ TEST_F(TopoCoordTest, NodeWontChooseSyncSourceFromOlderTerm) {
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
}
+TEST_F(TopoCoordTest, NodeCanChooseSyncSourceWithSameLastAppliedAndHigherLastOpCommitted) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "hself")
+ << BSON("_id" << 10 << "host"
+ << "h1"))),
+ 0);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+ OpTime lastApplied = OpTime(Timestamp(100, 3), 3);
+ OpTime ourLastOpCommitted = OpTime(Timestamp(100, 1), 3);
+ OpTime lastOpCommittedSyncSource = OpTime(Timestamp(100, 2), 3);
+
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ lastApplied,
+ Milliseconds(100),
+ lastOpCommittedSyncSource);
+
+ // Record 2nd round of pings to allow choosing a new sync source.
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ lastApplied,
+ Milliseconds(100),
+ lastOpCommittedSyncSource);
+
+ ASSERT(getTopoCoord().advanceLastCommittedOpTime(ourLastOpCommitted));
+ getTopoCoord().chooseNewSyncSource(
+ now()++, lastApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress());
+}
+
+TEST_F(TopoCoordTest, NodeCannotChooseSyncSourceWithSameLastAppliedAndSameLastOpCommitted) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "hself")
+ << BSON("_id" << 10 << "host"
+ << "h1"))),
+ 0);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+ OpTime lastApplied = OpTime(Timestamp(100, 3), 3);
+ OpTime lastOpCommitted = OpTime(Timestamp(100, 1), 3);
+
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ lastApplied,
+ Milliseconds(100),
+ lastOpCommitted);
+
+ // Record 2nd round of pings to allow choosing a new sync source.
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ lastApplied,
+ Milliseconds(100),
+ lastOpCommitted);
+
+ ASSERT(getTopoCoord().advanceLastCommittedOpTime(lastOpCommitted));
+ getTopoCoord().chooseNewSyncSource(
+ now()++, lastApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ ASSERT(getTopoCoord().getSyncSourceAddress().empty());
+}
TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
updateConfig(BSON("_id"
@@ -1614,7 +1698,7 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
HostAndPort member = HostAndPort("test0:1234");
getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(1), setName, member);
getTopoCoord().processHeartbeatResponse(
- startupTime + Milliseconds(2), Milliseconds(1), member, hbResponseGood);
+ startupTime + Milliseconds(2), Milliseconds(1), member, hbResponseGood, OpTime());
getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(3), setName, member);
Date_t timeoutTime =
startupTime + Milliseconds(3) + ReplSetConfig::kDefaultHeartbeatTimeoutPeriod;
@@ -1623,12 +1707,12 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
StatusWith<ReplSetHeartbeatResponse>(Status(ErrorCodes::HostUnreachable, ""));
getTopoCoord().processHeartbeatResponse(
- timeoutTime, Milliseconds(5000), member, hbResponseDown);
+ timeoutTime, Milliseconds(5000), member, hbResponseDown, OpTime());
member = HostAndPort("test1:1234");
getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(2), setName, member);
getTopoCoord().processHeartbeatResponse(
- heartbeatTime, Milliseconds(4000), member, hbResponseGood);
+ heartbeatTime, Milliseconds(4000), member, hbResponseGood, OpTime());
makeSelfPrimary(electionTime);
getTopoCoord().setMyLastAppliedOpTime(oplogProgress, startupTime, false);
getTopoCoord().setMyLastDurableOpTime(oplogDurable, startupTime, false);
@@ -1808,7 +1892,7 @@ TEST_F(TopoCoordTest, HeartbeatFrequencyShouldBeHalfElectionTimeoutWhenArbiter)
std::pair<ReplSetHeartbeatArgsV1, Milliseconds> uppingRequest =
getTopoCoord().prepareHeartbeatRequestV1(requestDate, "myset", target);
auto action = getTopoCoord().processHeartbeatResponse(
- requestDate, Milliseconds(0), target, makeStatusWith<ReplSetHeartbeatResponse>());
+ requestDate, Milliseconds(0), target, makeStatusWith<ReplSetHeartbeatResponse>(), OpTime());
Date_t expected(now() + Milliseconds(2500));
ASSERT_EQUALS(expected, action.getNextHeartbeatStartDate());
}
@@ -3468,6 +3552,31 @@ TEST_F(HeartbeatResponseTestV1,
now()));
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"), makeReplSetMetadata(newerThanLastOpTimeApplied), boost::none, now()));
+
+ // If we are as up-to-date as this sync source, but it has a higher lastOpCommitted, we will not
+ // change sync sources.
+ OpTime lastOpCommitted = OpTime(Timestamp(100, 0), 0);
+ OpTime newerLastOpCommitted = OpTime(Timestamp(200, 0), 0);
+ ASSERT(getTopoCoord().advanceLastCommittedOpTime(lastOpCommitted));
+ nextAction = receiveUpHeartbeat(HostAndPort("host2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ lastOpTimeApplied,
+ HostAndPort(),
+ newerLastOpCommitted);
+ ASSERT_NO_ACTION(nextAction.getAction());
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied, -1, -1, newerLastOpCommitted),
+ now()));
+
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(lastOpTimeApplied, -1, -1, newerLastOpCommitted),
+ boost::none,
+ now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) {
@@ -3724,7 +3833,7 @@ TEST_F(HeartbeatResponseTestV1, ReconfigNodeRemovedBetweenHeartbeatRequestAndRep
hb.setElectionTime(election.getTimestamp());
StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb);
HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse(
- now()++, Milliseconds(0), HostAndPort("host3"), hbResponse);
+ now()++, Milliseconds(0), HostAndPort("host3"), hbResponse, OpTime());
// primary should not be set and we should perform NoAction in response
ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
@@ -3772,7 +3881,7 @@ TEST_F(HeartbeatResponseTestV1, ReconfigBetweenHeartbeatRequestAndRepsonse) {
StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb);
getTopoCoord().setMyLastAppliedOpTime(lastOpTimeApplied, Date_t(), false);
HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse(
- now()++, Milliseconds(0), HostAndPort("host3"), hbResponse);
+ now()++, Milliseconds(0), HostAndPort("host3"), hbResponse, OpTime());
// now primary should be host3, index 1, and we should perform NoAction in response
ASSERT_EQUALS(1, getCurrentPrimaryIndex());
@@ -3948,13 +4057,15 @@ TEST_F(TopoCoordTest, FreshestNodeDoesCatchupTakeover) {
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host3:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
hbResp.setAppliedOpTime(behindOptime);
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host2:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
getTopoCoord().updateTerm(1, Date_t());
ASSERT_OK(getTopoCoord().becomeCandidateIfElectable(
@@ -4001,13 +4112,15 @@ TEST_F(TopoCoordTest, StaleNodeDoesntDoCatchupTakeover) {
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host3:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
hbResp.setAppliedOpTime(behindOptime);
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host2:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
getTopoCoord().updateTerm(1, Date_t());
Status result = getTopoCoord().becomeCandidateIfElectable(
@@ -4057,12 +4170,14 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverHeartbeatSaysPrimaryCaughtUp) {
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host3:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host2:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
getTopoCoord().updateTerm(1, Date_t());
Status result = getTopoCoord().becomeCandidateIfElectable(
@@ -4115,13 +4230,15 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverIfTermNumbersSayPrimaryCaughtUp
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host3:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
hbResp.setAppliedOpTime(behindOptime);
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
HostAndPort("host2:27017"),
- StatusWith<ReplSetHeartbeatResponse>(hbResp));
+ StatusWith<ReplSetHeartbeatResponse>(hbResp),
+ OpTime());
getTopoCoord().updateTerm(1, Date_t());
Status result = getTopoCoord().becomeCandidateIfElectable(
@@ -5590,7 +5707,8 @@ TEST_F(HeartbeatResponseTestV1, NodeDoesNotRetryHeartbeatIfTheFirstFailureTakesT
// no retry allowed.
Milliseconds(4990), // Spent 4.99 of the 5 seconds in the network.
target,
- StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"));
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5690,8 +5808,12 @@ public:
Date_t _upRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T12:55Z"));
std::pair<ReplSetHeartbeatArgsV1, Milliseconds> uppingRequest =
getTopoCoord().prepareHeartbeatRequestV1(_upRequestDate, "rs0", _target);
- HeartbeatResponseAction upAction = getTopoCoord().processHeartbeatResponse(
- _upRequestDate, Milliseconds(0), _target, makeStatusWith<ReplSetHeartbeatResponse>());
+ HeartbeatResponseAction upAction =
+ getTopoCoord().processHeartbeatResponse(_upRequestDate,
+ Milliseconds(0),
+ _target,
+ makeStatusWith<ReplSetHeartbeatResponse>(),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, upAction.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5710,8 +5832,8 @@ public:
_firstRequestDate + Seconds(4), // 4 seconds elapsed, retry allowed.
Milliseconds(3990), // Spent 3.99 of the 4 seconds in the network.
_target,
- StatusWith<ReplSetHeartbeatResponse>(
- ErrorCodes::ExceededTimeLimit, "Took too long")); // We've never applied anything.
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"),
+ OpTime()); // We've never applied anything.
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5768,8 +5890,8 @@ TEST_F(HeartbeatResponseTestOneRetryV1,
// no retry allowed.
Milliseconds(1000), // Spent 1 of the 1.01 seconds in the network.
target(),
- StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit,
- "Took too long")); // We've never applied anything.
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"),
+ OpTime()); // We've never applied anything.
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5789,7 +5911,8 @@ public:
// could retry.
Milliseconds(400), // Spent 0.4 of the 0.5 seconds in the network.
target(),
- StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"));
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
// Because the first retry failed without timing out, we expect to retry immediately.
@@ -5836,7 +5959,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, NodeDoesNotRetryHeartbeatsAfterFailing
// could still retry.
Milliseconds(100), // Spent 0.1 of the 0.3 seconds in the network.
target(),
- StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"));
+ StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
// Because this is the second retry, rather than retry again, we expect to wait for a quarter
@@ -5877,7 +6001,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures)
getTopoCoord().processHeartbeatResponse(firstRequestDate() + Milliseconds(4500),
Milliseconds(400),
target(),
- StatusWith<ReplSetHeartbeatResponse>(response));
+ StatusWith<ReplSetHeartbeatResponse>(response),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5894,7 +6019,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures)
firstRequestDate() + Milliseconds(7100),
Milliseconds(400),
target(),
- StatusWith<ReplSetHeartbeatResponse>(Status{ErrorCodes::HostUnreachable, ""}));
+ StatusWith<ReplSetHeartbeatResponse>(Status{ErrorCodes::HostUnreachable, ""}),
+ OpTime());
ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction());
ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
@@ -5950,7 +6076,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1, UpdateHeartbeatDataOldConfig) {
now()++, // Time is left.
Milliseconds(400), // Spent 0.4 of the 0.5 second in the network.
HostAndPort("host2"),
- StatusWith<ReplSetHeartbeatResponse>(believesWeAreDownResponse));
+ StatusWith<ReplSetHeartbeatResponse>(believesWeAreDownResponse),
+ OpTime());
stopCapturingLogMessages();
ASSERT_NO_ACTION(action.getAction());
ASSERT_EQUALS(1, countLogLinesContaining("host2:27017 thinks that we are down"));
@@ -5999,7 +6126,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1, UpdateHeartbeatDataSameConfig) {
now()++, // Time is left.
Milliseconds(400), // Spent 0.4 of the 0.5 second in the network.
HostAndPort("host2"),
- StatusWith<ReplSetHeartbeatResponse>(sameConfigResponse));
+ StatusWith<ReplSetHeartbeatResponse>(sameConfigResponse),
+ OpTime());
stopCapturingLogMessages();
ASSERT_NO_ACTION(action.getAction());
ASSERT_EQUALS(1,
@@ -6027,7 +6155,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1,
now()++, // Time is left.
Milliseconds(400), // Spent 0.4 of the 0.5 second in the network.
HostAndPort("host5"),
- StatusWith<ReplSetHeartbeatResponse>(memberMissingResponse));
+ StatusWith<ReplSetHeartbeatResponse>(memberMissingResponse),
+ OpTime());
stopCapturingLogMessages();
ASSERT_NO_ACTION(action.getAction());
ASSERT_EQUALS(1, countLogLinesContaining("Could not find host5:27017 in current config"));
@@ -6053,7 +6182,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1,
now()++, // Time is left.
Milliseconds(400), // Spent 0.4 of the 0.5 second in the network.
HostAndPort("host2"),
- StatusWith<ReplSetHeartbeatResponse>(believesWeAreDownResponse));
+ StatusWith<ReplSetHeartbeatResponse>(believesWeAreDownResponse),
+ OpTime());
stopCapturingLogMessages();
ASSERT_NO_ACTION(action.getAction());
ASSERT_EQUALS(1, countLogLinesContaining("host2:27017 thinks that we are down"));
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index e3ced4ac2e8..3a43b9c41f0 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -1236,17 +1236,10 @@ var ReplSetTest = function(opts) {
let master = rst.getPrimary();
let id = tojson(rst.nodeList());
- // Algorithm precondition: All nodes must be in primary/secondary state.
- //
- // 1) Perform a majority write. This will guarantee the primary updates its commit point
- // to the value of this write.
- //
- // 2) Perform a second write. This will guarantee that all nodes will update their commit
- // point to a time that is >= the previous write. That will trigger a stable checkpoint
- // on all nodes.
- // TODO(SERVER-33248): Remove this block. We should not need to prod the replica set to
- // advance the commit point if the commit point being lagged is sufficient to choose a
- // sync source.
+ // All nodes must be in primary/secondary state prior to this point. Perform a majority
+ // write to ensure there is a committed operation on the set. The commit point will
+ // propagate to all members and trigger a stable checkpoint on all persisted storage engines
+ // nodes.
function advanceCommitPoint(master) {
// Shadow 'db' so that we can call 'advanceCommitPoint' directly on the primary node.
let db = master.getDB('admin');
@@ -1256,6 +1249,10 @@ var ReplSetTest = function(opts) {
"data": {"awaitLastStableCheckpointTimestamp": 1},
"writeConcern": {"w": "majority", "wtimeout": ReplSetTest.kDefaultTimeoutMS}
}));
+
+ // TODO(SERVER-36758): Remove the second write. We should not need to prod the
+ // replica set to advance the commit point once all nodes are running a version with
+ // SERVER-33248. This can be removed once the last stable version includes the fix.
assert.commandWorked(db.adminCommand(
{"appendOplogNote": 1, "data": {"awaitLastStableCheckpointTimestamp": 2}}));
};