diff options
17 files changed, 319 insertions, 150 deletions
diff --git a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough.yml index 8eaa08c3f36..6ada79e5ca0 100644 --- a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough.yml @@ -136,10 +136,6 @@ selector: - jstests/core/explain_upsert.js # The `dbstats` command builds in-memory structures that are not causally consistent. - jstests/core/dbstats.js - # TODO SERVER-33248: Remove this test from the blacklist. The secondary can get stuck without a - # sync source if the RestartCatalog command kills its oplog fetching cursor, which blocks - # secondary majority reads. - - jstests/core/restart_catalog.js exclude_with_any_tags: - assumes_against_mongod_not_mongos ## diff --git a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml index bbe744b2286..1a94d618226 100644 --- a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml +++ b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_passthrough_auth.yml @@ -154,10 +154,6 @@ selector: - jstests/core/explain_upsert.js # The `dbstats` command builds in-memory structures that are not causally consistent. - jstests/core/dbstats.js - # TODO SERVER-33248: Remove this test from the blacklist. The secondary can get stuck without a - # sync source if the RestartCatalog command kills its oplog fetching cursor, which blocks - # secondary majority reads. - - jstests/core/restart_catalog.js # These include operations the root user auth'd on the test database is not authorized to perform, # e.g. dropping or creating system collections. diff --git a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml index aa24ddbc2ef..b3ae37b1873 100644 --- a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml @@ -176,10 +176,6 @@ selector: - jstests/core/orf.js #explain.executionStats is not CC # The `dbstats` command builds in-memory structures that are not causally consistent. - jstests/core/dbstats.js - # TODO SERVER-33248: Remove this test from the blacklist. The secondary can get stuck without a - # sync source if the RestartCatalog command kills its oplog fetching cursor, which blocks - # secondary majority reads. - - jstests/core/restart_catalog.js exclude_with_any_tags: # Tests tagged with the following will fail because they assume collections are not sharded. - assumes_no_implicit_collection_creation_after_drop diff --git a/buildscripts/resmokelib/testing/fixtures/replicaset.py b/buildscripts/resmokelib/testing/fixtures/replicaset.py index 437037e88b9..b1966e4ed59 100644 --- a/buildscripts/resmokelib/testing/fixtures/replicaset.py +++ b/buildscripts/resmokelib/testing/fixtures/replicaset.py @@ -272,21 +272,13 @@ class ReplicaSetFixture(interface.ReplFixture): # pylint: disable=too-many-inst primary_client = self.nodes[0].mongo_client() self.auth(primary_client, self.auth_options) - # Algorithm precondition: All nodes must be in primary/secondary state. - # - # 1) Perform a majority write. This will guarantee the primary updates its commit point - # to the value of this write. - # - # 2) Perform a second write. This will guarantee that all nodes will update their commit - # point to a time that is >= the previous write. That will trigger a stable checkpoint - # on all nodes. - # TODO(SERVER-33248): Remove this block. We should not need to prod the replica set to - # advance the commit point if the commit point being lagged is sufficient to choose a - # sync source. + # All nodes must be in primary/secondary state prior to this point. Perform a majority + # write to ensure there is a committed operation on the set. The commit point will + # propagate to all members and trigger a stable checkpoint on all persisted storage engines + # nodes. admin = primary_client.get_database( "admin", write_concern=pymongo.write_concern.WriteConcern(w="majority")) - admin.command("appendOplogNote", data={"await_stable_checkpoint": 1}) - admin.command("appendOplogNote", data={"await_stable_checkpoint": 2}) + admin.command("appendOplogNote", data={"await_stable_recovery_timestamp": 1}) for node in self.nodes: self.logger.info("Waiting for node on port %d to have a stable checkpoint.", node.port) diff --git a/jstests/libs/override_methods/check_uuids_consistent_across_cluster.js b/jstests/libs/override_methods/check_uuids_consistent_across_cluster.js index 0a05e19b25c..6412b894ebd 100644 --- a/jstests/libs/override_methods/check_uuids_consistent_across_cluster.js +++ b/jstests/libs/override_methods/check_uuids_consistent_across_cluster.js @@ -47,11 +47,6 @@ ShardingTest.prototype.checkUUIDsConsistentAcrossCluster = function() { continue; } var rs = this._rs[i].test; - // The noop writer needs to be enabled in case a sync source isn't set, so that - // awaitLastOpCommitted() is guaranteed to finish. - // SERVER-33248 for reference. - rs.getPrimary().adminCommand({setParameter: 1, periodicNoopIntervalSecs: 1}); - rs.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}); var keyFile = this._otherParams.keyFile; if (keyFile) { authutil.asCluster(rs.nodes, keyFile, function() { diff --git a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js index 6c650a2cded..85206f15650 100644 --- a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js +++ b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js @@ -29,20 +29,10 @@ // Create the collection and insert one document. Get the op time of the write. let res = assert.commandWorked(primaryDB.runCommand( {insert: collName, documents: [{_id: "before"}], writeConcern: {w: "majority"}})); - let clusterTimePrimaryBefore; + const clusterTimePrimaryBefore = res.opTime.ts; // Wait for the majority commit point on 'secondaryDB0' to include the {_id: "before"} write. assert.soonNoExcept(function() { - // Without a consistent stream of writes, secondary majority reads are not guaranteed - // to complete, since the commit point being stale is not sufficient to establish a sync - // source. - // TODO (SERVER-33248): Remove this write and increase the maxTimeMS on the read. - res = assert.commandWorked(primaryDB.runCommand( - {insert: "otherColl", documents: [{a: 1}], writeConcern: {w: "majority"}})); - assert(res.hasOwnProperty("opTime"), tojson(res)); - assert(res.opTime.hasOwnProperty("ts"), tojson(res)); - clusterTimePrimaryBefore = res.opTime.ts; - return assert .commandWorked(secondaryDB0.runCommand( {find: collName, readConcern: {level: "majority"}, maxTimeMS: 10000})) diff --git a/jstests/replsets/dbhash_at_cluster_time.js b/jstests/replsets/dbhash_at_cluster_time.js index 27f4e0f79db..8c52ab741db 100644 --- a/jstests/replsets/dbhash_at_cluster_time.js +++ b/jstests/replsets/dbhash_at_cluster_time.js @@ -19,17 +19,6 @@ const db = session.getDatabase("test"); let txnNumber = 0; - // We force 'secondary' to sync from 'primary' using the "forceSyncSourceCandidate" failpoint to - // ensure that an intermittent connectivity issue doesn't lead to the secondary not advancing - // its belief of the majority commit point. This avoids any complications that would arise due - // to SERVER-33248. - assert.commandWorked(secondary.adminCommand({ - configureFailPoint: "forceSyncSourceCandidate", - mode: "alwaysOn", - data: {hostAndPort: primary.host} - })); - rst.awaitSyncSource(secondary, primary); - // We also prevent all nodes in the replica set from advancing oldest_timestamp. This ensures // that the snapshot associated with 'clusterTime' is retained for the duration of this test. rst.nodes.forEach(conn => { diff --git a/jstests/replsets/libs/secondary_reads_test.js b/jstests/replsets/libs/secondary_reads_test.js index 2ca1117b6ed..da22f9b73b5 100644 --- a/jstests/replsets/libs/secondary_reads_test.js +++ b/jstests/replsets/libs/secondary_reads_test.js @@ -24,9 +24,7 @@ function SecondaryReadsTest(name = "secondary_reads_test") { * two-node replica set running with the latest version. */ function performStandardSetup() { - // TODO: Periodic noop writes can be removed once SERVER-33248 is complete. - let replSet = new ReplSetTest( - {name, nodes: 2, nodeOptions: {setParameter: {writePeriodicNoops: true}}}); + let replSet = new ReplSetTest({name, nodes: 2}); replSet.startSet(); const nodes = replSet.nodeList(); diff --git a/src/mongo/db/repl/member_data.cpp b/src/mongo/db/repl/member_data.cpp index 0ea383532b0..5cf86a04b21 100644 --- a/src/mongo/db/repl/member_data.cpp +++ b/src/mongo/db/repl/member_data.cpp @@ -45,7 +45,9 @@ MemberData::MemberData() : _health(-1), _authIssue(false), _configIndex(-1), _is _lastResponse.setAppliedOpTime(OpTime()); } -bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) { +bool MemberData::setUpValues(Date_t now, + ReplSetHeartbeatResponse&& hbResponse, + OpTime lastOpCommitted) { _health = 1; if (_upSince == Date_t()) { _upSince = now; @@ -56,6 +58,10 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) _lastUpdateStale = false; _updatedSinceRestart = true; + if (!lastOpCommitted.isNull()) { + _lastOpCommitted = lastOpCommitted; + } + if (!hbResponse.hasState()) { hbResponse.setState(MemberState::RS_UNKNOWN); } diff --git a/src/mongo/db/repl/member_data.h b/src/mongo/db/repl/member_data.h index ab484a64469..fb6e46443ca 100644 --- a/src/mongo/db/repl/member_data.h +++ b/src/mongo/db/repl/member_data.h @@ -74,6 +74,9 @@ public: OpTime getHeartbeatDurableOpTime() const { return _lastResponse.hasDurableOpTime() ? _lastResponse.getDurableOpTime() : OpTime(); } + OpTime getHeartbeatLastOpCommitted() const { + return _lastOpCommitted; + } int getConfigVersion() const { return _lastResponse.getConfigVersion(); } @@ -141,9 +144,10 @@ public: /** * Sets values in this object from the results of a successful heartbeat command. + * 'lastOpCommitted' should be extracted from the heartbeat metadata. * Returns whether or not the optimes advanced as a result of this heartbeat response. */ - bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse); + bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse, OpTime lastOpCommitted); /** * Sets values in this object from the results of a erroring/failed heartbeat command. @@ -259,6 +263,11 @@ private: // Last known OpTime that the replica has applied, whether journaled or unjournaled. OpTime _lastAppliedOpTime; + // OpTime of the most recently committed op of which the node was aware, extracted from the + // heartbeat metadata. Note that only arbiters should update their knowledge of the commit point + // from heartbeat data. + OpTime _lastOpCommitted; + // TODO(russotto): Since memberData is kept in config order, _configIndex // and _isSelf may not be necessary. // Index of this member in the replica set configuration. diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 28af2d5a834..df1f58e83a8 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -117,8 +117,11 @@ BSONObj makeMetadataObject(bool isV1ElectionProtocol) { * Checks the first batch of results from query. * 'documents' are the first batch of results returned from tailing the remote oplog. * 'lastFetched' optime and hash should be consistent with the predicate in the query. + * 'lastOpCommitted' is the OpTime of the most recently committed op of which this node is aware. * 'remoteLastOpApplied' is the last OpTime applied on the sync source. This is optional for * compatibility with 3.4 servers that do not send OplogQueryMetadata. + * 'remoteLastOpCommitted' is the OpTime of the most recently committed op of which the sync source + * is aware. * 'requiredRBID' is a RollbackID received when we chose the sync source that we use here to * guarantee we have not rolled back since we confirmed the sync source had our minValid. * 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. This is optional @@ -127,14 +130,17 @@ BSONObj makeMetadataObject(bool isV1ElectionProtocol) { * oplog to be ahead of ours. If false, the sync source's oplog is allowed to be at the same point * as ours, but still cannot be behind ours. * - * TODO (SERVER-27668): Make remoteLastOpApplied and remoteRBID non-optional in mongodb 3.8. + * TODO (SERVER-27668): Make remoteLastOpApplied, remoteLastOpCommitted, and remoteRBID + * non-optional. * * Returns OplogStartMissing if we cannot find the optime of the last fetched operation in * the remote oplog. */ Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched, + OpTime lastOpCommitted, boost::optional<OpTime> remoteLastOpApplied, + boost::optional<OpTime> remoteLastOpCommitted, int requiredRBID, boost::optional<int> remoteRBID, bool requireFresherSyncSource) { @@ -158,22 +164,11 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents, } } - // The SyncSourceResolver never checks that the sync source candidate is actually ahead of - // us. Rather than have it check there with an extra network roundtrip, we check here. - if (requireFresherSyncSource && remoteLastOpApplied && - (*remoteLastOpApplied <= lastFetched.opTime)) { - return Status(ErrorCodes::InvalidSyncSource, - str::stream() << "Sync source's last applied OpTime " - << remoteLastOpApplied->toString() - << " is not greater than our last fetched OpTime " - << lastFetched.opTime.toString() - << ". Choosing new sync source."); - } else if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) { - // In initial sync, the lastFetched OpTime will almost always equal the remoteLastOpApplied - // since we fetch the sync source's last applied OpTime to determine where to start our - // OplogFetcher. This is fine since no other node can sync off of an initial syncing node - // and thus cannot form a sync source cycle. To account for this, we must relax the - // constraint on our sync source being fresher. + // The sync source could be behind us if it rolled back after we selected it. We could have + // failed to detect the rollback if it occurred between sync source selection (when we check the + // candidate is ahead of us) and sync source resolution (when we got 'requiredRBID'). If the + // sync source is now behind us, choose a new sync source to prevent going into rollback. + if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) { return Status(ErrorCodes::InvalidSyncSource, str::stream() << "Sync source's last applied OpTime " << remoteLastOpApplied->toString() @@ -182,10 +177,36 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents, << ". Choosing new sync source."); } - // At this point we know that our sync source has our minValid and is ahead of us, so if our + // If 'requireFresherSyncSource' is true, we must check that the sync source's + // lastApplied/lastOpCommitted is ahead of us to prevent forming a cycle. Although we check for + // this condition in sync source selection, if an undetected rollback occurred between sync + // source selection and sync source resolution, this condition may no longer hold. + // 'requireFresherSyncSource' is false for initial sync, since no other node can sync off an + // initial syncing node, so we do not need to check for cycles. In addition, it would be + // problematic to check this condition for initial sync, since the 'lastFetched' OpTime will + // almost always equal the 'remoteLastApplied', since we fetch the sync source's last applied + // OpTime to determine where to start our OplogFetcher. + if (requireFresherSyncSource && remoteLastOpApplied && remoteLastOpCommitted && + std::tie(*remoteLastOpApplied, *remoteLastOpCommitted) <= + std::tie(lastFetched.opTime, lastOpCommitted)) { + return Status(ErrorCodes::InvalidSyncSource, + str::stream() + << "Sync source cannot be behind me, and if I am up-to-date with the " + "sync source, it must have a higher lastOpCommitted. " + << "My last fetched oplog optime: " + << lastFetched.opTime.toString() + << ", latest oplog optime of sync source: " + << remoteLastOpApplied->toString() + << ", my lastOpCommitted: " + << lastOpCommitted.toString() + << ", lastOpCommitted of sync source: " + << remoteLastOpCommitted->toString()); + } + + // At this point we know that our sync source has our minValid and is not behind us, so if our // history diverges from our sync source's we should prefer its history and roll back ours. - // Since we checked for rollback and our sync source is ahead of us, an empty batch means that + // Since we checked for rollback and our sync source is not behind us, an empty batch means that // we have a higher timestamp on our last fetched OpTime than our sync source's last applied // OpTime, but a lower term. When this occurs, we must roll back our inconsistent oplog entry. if (documents.empty()) { @@ -427,12 +448,17 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none; auto remoteLastApplied = oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none; - auto status = checkRemoteOplogStart(documents, - lastFetched, - remoteLastApplied, - _requiredRBID, - remoteRBID, - _requireFresherSyncSource); + auto remoteLastOpCommitted = + oqMetadata ? boost::make_optional(oqMetadata->getLastOpCommitted()) : boost::none; + auto status = checkRemoteOplogStart( + documents, + lastFetched, + _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().opTime, + remoteLastApplied, + remoteLastOpCommitted, + _requiredRBID, + remoteRBID, + _requireFresherSyncSource); if (!status.isOK()) { // Stop oplog fetcher and execute rollback if necessary. return status; diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 5209ea0c4e3..9e0fc7378c1 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -359,6 +359,22 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead } TEST_F(OplogFetcherTest, + MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadButHasHigherLastOpCommitted) { + rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata(remoteNewerOpTime, lastFetched.opTime, rbid, 2, 2); + BSONObjBuilder bob; + ASSERT_OK(replMetadata.writeToMetadata(&bob)); + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + + auto entry = makeNoopOplogEntry(lastFetched); + auto shutdownState = + processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false); + ASSERT_OK(shutdownState->getStatus()); + ASSERT(dataReplicatorExternalState->metadataWasProcessed); +} + +TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) { rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 3a1a9cba884..64c06d47182 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -152,6 +152,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( } ReplSetHeartbeatResponse hbResponse; + OpTime lastOpCommitted; BSONObj resp; if (responseStatus.isOK()) { resp = cbData.response.data; @@ -180,9 +181,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( replMetadata = responseStatus; } if (replMetadata.isOK()) { + lastOpCommitted = replMetadata.getValue().getLastOpCommitted(); + // Arbiters are the only nodes allowed to advance their commit point via heartbeats. if (_getMemberState_inlock().arbiter()) { - _advanceCommitPoint_inlock(replMetadata.getValue().getLastOpCommitted()); + _advanceCommitPoint_inlock(lastOpCommitted); } // Asynchronous stepdown could happen, but it will wait for _mutex and execute // after this function, so we cannot and don't need to wait for it to finish. @@ -211,8 +214,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( hbStatusResponse = StatusWith<ReplSetHeartbeatResponse>(responseStatus); } - HeartbeatResponseAction action = - _topCoord->processHeartbeatResponse(now, networkTime, target, hbStatusResponse); + HeartbeatResponseAction action = _topCoord->processHeartbeatResponse( + now, networkTime, target, hbStatusResponse, lastOpCommitted); if (action.getAction() == HeartbeatResponseAction::NoAction && hbStatusResponse.isOK() && hbStatusResponse.getValue().hasState() && diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 07020fc1fb6..f5e7fdfbe5d 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -385,13 +385,21 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, continue; } } - // only consider candidates that are ahead of where we are - if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) { - LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. " - << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON() - << ", latest oplog optime of sync candidate " - << itMemberConfig.getHostAndPort() << ": " - << it->getHeartbeatAppliedOpTime().toBSON(); + // Do not select a candidate that is behind me. If I am up to date with the candidate, + // only select them if they have a higher lastOpCommitted. + if (std::tuple<OpTime, OpTime>(it->getHeartbeatAppliedOpTime(), + it->getHeartbeatLastOpCommitted()) <= + std::tie(lastOpTimeFetched, _lastCommittedOpTime)) { + LOG(1) << "Cannot select this sync source. Sync source cannot be behind me, and if " + "I am up-to-date with the sync source, it must have a higher " + "lastOpCommitted. " + << "Sync candidate: " << itMemberConfig.getHostAndPort() + << ", my last fetched oplog optime: " << lastOpTimeFetched.toBSON() + << ", latest oplog optime of sync candidate: " + << it->getHeartbeatAppliedOpTime().toBSON() + << ", my lastOpCommitted: " << _lastCommittedOpTime + << ", lastOpCommitted of sync candidate: " + << it->getHeartbeatLastOpCommitted(); continue; } // Candidate cannot be more latent than anything we've already considered. @@ -1025,7 +1033,8 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, - const StatusWith<ReplSetHeartbeatResponse>& hbResponse) { + const StatusWith<ReplSetHeartbeatResponse>& hbResponse, + OpTime lastOpCommitted) { const MemberState originalState = getMemberState(); PingStats& hbStats = _pings[target]; invariant(hbStats.getLastHeartbeatStartDate() != Date_t()); @@ -1146,7 +1155,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue()); LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId() << ", msg: " << hbr.getHbMsg(); - advancedOpTime = hbData.setUpValues(now, std::move(hbr)); + advancedOpTime = hbData.setUpValues(now, std::move(hbr), lastOpCommitted); } HeartbeatResponseAction nextAction; @@ -1964,7 +1973,8 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex, hbResponse.setHbMsg(""); _memberData.at(primaryIndex) .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(), - std::move(hbResponse)); + std::move(hbResponse), + _memberData.at(primaryIndex).getHeartbeatLastOpCommitted()); } _currentPrimaryIndex = primaryIndex; } @@ -2231,6 +2241,10 @@ void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) { entry.append("optime", lastDurableOpTime.getTimestamp()); } entry.append("host", memberData.getHostAndPort().toString()); + + const auto lastOpCommitted = memberData.getHeartbeatLastOpCommitted(); + entry.append("heartbeatLastOpCommitted", lastOpCommitted.toBSON()); + if (_selfIndex >= 0) { const int memberId = memberData.getMemberId(); invariant(memberId >= 0); @@ -3198,18 +3212,25 @@ bool TopologyCoordinator::shouldChangeSyncSource( // If OplogQueryMetadata was provided, use its values, otherwise use the ones in // ReplSetMetadata. OpTime currentSourceOpTime; + OpTime currentSourceLastOpCommitted; int syncSourceIndex = -1; int primaryIndex = -1; if (oqMetadata) { currentSourceOpTime = std::max(oqMetadata->getLastOpApplied(), _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); + currentSourceLastOpCommitted = + std::max(oqMetadata->getLastOpCommitted(), + _memberData.at(currentSourceIndex).getHeartbeatLastOpCommitted()); syncSourceIndex = oqMetadata->getSyncSourceIndex(); primaryIndex = oqMetadata->getPrimaryIndex(); } else { currentSourceOpTime = std::max(replMetadata.getLastOpVisible(), _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); + currentSourceLastOpCommitted = + std::max(replMetadata.getLastOpCommitted(), + _memberData.at(currentSourceIndex).getHeartbeatLastOpCommitted()); syncSourceIndex = replMetadata.getSyncSourceIndex(); primaryIndex = replMetadata.getPrimaryIndex(); } @@ -3224,12 +3245,20 @@ bool TopologyCoordinator::shouldChangeSyncSource( // unless they are primary. const OpTime myLastOpTime = getMyLastAppliedOpTime(); if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 && - currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) { + std::tie(currentSourceOpTime, currentSourceLastOpCommitted) <= + std::tie(myLastOpTime, _lastCommittedOpTime) && + primaryIndex != currentSourceIndex) { std::stringstream logMessage; - logMessage << "Choosing new sync source because our current sync source, " - << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime - << ") which is not ahead of ours (" << myLastOpTime - << "), it does not have a sync source, and it's not the primary"; + + logMessage << "Choosing new sync source. Our current sync source is not primary and does " + "not have a sync source, so we require that it is not behind us, and that if " + "we are up-to-date with it, it has a higher lastOpCommitted. " + << "Current sync source: " << currentSource.toString() + << ", my last fetched oplog optime: " << myLastOpTime + << ", latest oplog optime of sync source: " << currentSourceOpTime + << ", my lastOpCommitted: " << _lastCommittedOpTime + << ", lastOpCommitted of sync source: " << currentSourceLastOpCommitted; + if (primaryIndex >= 0) { logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)"; } else { diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index e7052b79d68..2de3df6d95c 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -424,8 +424,8 @@ public: Date_t now, const std::string& ourSetName, const HostAndPort& target); /** - * Processes a heartbeat response from "target" that arrived around "now", having - * spent "networkRoundTripTime" millis on the network. + * Processes a heartbeat response from "target" that arrived around "now" with "lastOpCommitted" + * in the metadata, having spent "networkRoundTripTime" millis on the network. * * Updates internal topology coordinator state, and returns instructions about what action * to take next. @@ -454,7 +454,8 @@ public: Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, - const StatusWith<ReplSetHeartbeatResponse>& hbResponse); + const StatusWith<ReplSetHeartbeatResponse>& hbResponse, + OpTime lastOpCommitted); /** * Returns whether or not at least 'numNodes' have reached the given opTime. diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 29925c68e9b..4ca986b47f7 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -170,9 +170,10 @@ protected: // Only set visibleOpTime, primaryIndex and syncSourceIndex ReplSetMetadata makeReplSetMetadata(OpTime visibleOpTime = OpTime(), int primaryIndex = -1, - int syncSourceIndex = -1) { + int syncSourceIndex = -1, + OpTime committedOpTime = OpTime()) { return ReplSetMetadata(_topo->getTerm(), - OpTime(), + committedOpTime, visibleOpTime, _currentConfig.getConfigVersion(), OID(), @@ -184,8 +185,10 @@ protected: // Only set lastAppliedOpTime, primaryIndex and syncSourceIndex OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(), int primaryIndex = -1, - int syncSourceIndex = -1) { - return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, primaryIndex, syncSourceIndex); + int syncSourceIndex = -1, + OpTime committedOpTime = OpTime()) { + return OplogQueryMetadata( + committedOpTime, lastAppliedOpTime, -1, primaryIndex, syncSourceIndex); } HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, @@ -193,13 +196,15 @@ protected: MemberState memberState, const OpTime& electionTime, const OpTime& lastOpTimeSender, - const HostAndPort& syncingTo = HostAndPort()) { + const HostAndPort& syncingTo = HostAndPort(), + const OpTime& lastOpCommittedSender = OpTime()) { return _receiveHeartbeatHelper(Status::OK(), member, setName, memberState, electionTime.getTimestamp(), lastOpTimeSender, + lastOpCommittedSender, Milliseconds(1), syncingTo); } @@ -217,6 +222,7 @@ protected: MemberState::RS_UNKNOWN, Timestamp(), OpTime(), + OpTime(), roundTripTime, HostAndPort()); } @@ -225,13 +231,15 @@ protected: const std::string& setName, MemberState memberState, const OpTime& lastOpTimeSender, - Milliseconds roundTripTime = Milliseconds(1)) { + Milliseconds roundTripTime = Milliseconds(1), + const OpTime& lastOpCommittedSender = OpTime()) { return _receiveHeartbeatHelper(Status::OK(), member, setName, memberState, Timestamp(), lastOpTimeSender, + lastOpCommittedSender, roundTripTime, HostAndPort()); } @@ -243,6 +251,7 @@ private: MemberState memberState, Timestamp electionTime, const OpTime& lastOpTimeSender, + const OpTime& lastOpCommittedSender, Milliseconds roundTripTime, const HostAndPort& syncingTo) { ReplSetHeartbeatResponse hb; @@ -260,7 +269,8 @@ private: getTopoCoord().prepareHeartbeatRequestV1(now(), setName, member); now() += roundTripTime; - return getTopoCoord().processHeartbeatResponse(now(), roundTripTime, member, hbResponse); + return getTopoCoord().processHeartbeatResponse( + now(), roundTripTime, member, hbResponse, lastOpCommittedSender); } private: @@ -575,6 +585,80 @@ TEST_F(TopoCoordTest, NodeWontChooseSyncSourceFromOlderTerm) { ASSERT(getTopoCoord().getSyncSourceAddress().empty()); } +TEST_F(TopoCoordTest, NodeCanChooseSyncSourceWithSameLastAppliedAndHigherLastOpCommitted) { + updateConfig(BSON("_id" + << "rs0" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "hself") + << BSON("_id" << 10 << "host" + << "h1"))), + 0); + + setSelfMemberState(MemberState::RS_SECONDARY); + OpTime lastApplied = OpTime(Timestamp(100, 3), 3); + OpTime ourLastOpCommitted = OpTime(Timestamp(100, 1), 3); + OpTime lastOpCommittedSyncSource = OpTime(Timestamp(100, 2), 3); + + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + lastApplied, + Milliseconds(100), + lastOpCommittedSyncSource); + + // Record 2nd round of pings to allow choosing a new sync source. + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + lastApplied, + Milliseconds(100), + lastOpCommittedSyncSource); + + ASSERT(getTopoCoord().advanceLastCommittedOpTime(ourLastOpCommitted)); + getTopoCoord().chooseNewSyncSource( + now()++, lastApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress()); +} + +TEST_F(TopoCoordTest, NodeCannotChooseSyncSourceWithSameLastAppliedAndSameLastOpCommitted) { + updateConfig(BSON("_id" + << "rs0" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "hself") + << BSON("_id" << 10 << "host" + << "h1"))), + 0); + + setSelfMemberState(MemberState::RS_SECONDARY); + OpTime lastApplied = OpTime(Timestamp(100, 3), 3); + OpTime lastOpCommitted = OpTime(Timestamp(100, 1), 3); + + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + lastApplied, + Milliseconds(100), + lastOpCommitted); + + // Record 2nd round of pings to allow choosing a new sync source. + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + lastApplied, + Milliseconds(100), + lastOpCommitted); + + ASSERT(getTopoCoord().advanceLastCommittedOpTime(lastOpCommitted)); + getTopoCoord().chooseNewSyncSource( + now()++, lastApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + ASSERT(getTopoCoord().getSyncSourceAddress().empty()); +} TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { updateConfig(BSON("_id" @@ -1614,7 +1698,7 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { HostAndPort member = HostAndPort("test0:1234"); getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(1), setName, member); getTopoCoord().processHeartbeatResponse( - startupTime + Milliseconds(2), Milliseconds(1), member, hbResponseGood); + startupTime + Milliseconds(2), Milliseconds(1), member, hbResponseGood, OpTime()); getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(3), setName, member); Date_t timeoutTime = startupTime + Milliseconds(3) + ReplSetConfig::kDefaultHeartbeatTimeoutPeriod; @@ -1623,12 +1707,12 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { StatusWith<ReplSetHeartbeatResponse>(Status(ErrorCodes::HostUnreachable, "")); getTopoCoord().processHeartbeatResponse( - timeoutTime, Milliseconds(5000), member, hbResponseDown); + timeoutTime, Milliseconds(5000), member, hbResponseDown, OpTime()); member = HostAndPort("test1:1234"); getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(2), setName, member); getTopoCoord().processHeartbeatResponse( - heartbeatTime, Milliseconds(4000), member, hbResponseGood); + heartbeatTime, Milliseconds(4000), member, hbResponseGood, OpTime()); makeSelfPrimary(electionTime); getTopoCoord().setMyLastAppliedOpTime(oplogProgress, startupTime, false); getTopoCoord().setMyLastDurableOpTime(oplogDurable, startupTime, false); @@ -1808,7 +1892,7 @@ TEST_F(TopoCoordTest, HeartbeatFrequencyShouldBeHalfElectionTimeoutWhenArbiter) std::pair<ReplSetHeartbeatArgsV1, Milliseconds> uppingRequest = getTopoCoord().prepareHeartbeatRequestV1(requestDate, "myset", target); auto action = getTopoCoord().processHeartbeatResponse( - requestDate, Milliseconds(0), target, makeStatusWith<ReplSetHeartbeatResponse>()); + requestDate, Milliseconds(0), target, makeStatusWith<ReplSetHeartbeatResponse>(), OpTime()); Date_t expected(now() + Milliseconds(2500)); ASSERT_EQUALS(expected, action.getNextHeartbeatStartDate()); } @@ -3468,6 +3552,31 @@ TEST_F(HeartbeatResponseTestV1, now())); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( HostAndPort("host2"), makeReplSetMetadata(newerThanLastOpTimeApplied), boost::none, now())); + + // If we are as up-to-date as this sync source, but it has a higher lastOpCommitted, we will not + // change sync sources. + OpTime lastOpCommitted = OpTime(Timestamp(100, 0), 0); + OpTime newerLastOpCommitted = OpTime(Timestamp(200, 0), 0); + ASSERT(getTopoCoord().advanceLastCommittedOpTime(lastOpCommitted)); + nextAction = receiveUpHeartbeat(HostAndPort("host2"), + "rs0", + MemberState::RS_SECONDARY, + election, + lastOpTimeApplied, + HostAndPort(), + newerLastOpCommitted); + ASSERT_NO_ACTION(nextAction.getAction()); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied, -1, -1, newerLastOpCommitted), + now())); + + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + makeReplSetMetadata(lastOpTimeApplied, -1, -1, newerLastOpCommitted), + boost::none, + now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) { @@ -3724,7 +3833,7 @@ TEST_F(HeartbeatResponseTestV1, ReconfigNodeRemovedBetweenHeartbeatRequestAndRep hb.setElectionTime(election.getTimestamp()); StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb); HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse( - now()++, Milliseconds(0), HostAndPort("host3"), hbResponse); + now()++, Milliseconds(0), HostAndPort("host3"), hbResponse, OpTime()); // primary should not be set and we should perform NoAction in response ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); @@ -3772,7 +3881,7 @@ TEST_F(HeartbeatResponseTestV1, ReconfigBetweenHeartbeatRequestAndRepsonse) { StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb); getTopoCoord().setMyLastAppliedOpTime(lastOpTimeApplied, Date_t(), false); HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse( - now()++, Milliseconds(0), HostAndPort("host3"), hbResponse); + now()++, Milliseconds(0), HostAndPort("host3"), hbResponse, OpTime()); // now primary should be host3, index 1, and we should perform NoAction in response ASSERT_EQUALS(1, getCurrentPrimaryIndex()); @@ -3948,13 +4057,15 @@ TEST_F(TopoCoordTest, FreshestNodeDoesCatchupTakeover) { getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host3:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); hbResp.setAppliedOpTime(behindOptime); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host2:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); getTopoCoord().updateTerm(1, Date_t()); ASSERT_OK(getTopoCoord().becomeCandidateIfElectable( @@ -4001,13 +4112,15 @@ TEST_F(TopoCoordTest, StaleNodeDoesntDoCatchupTakeover) { getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host3:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); hbResp.setAppliedOpTime(behindOptime); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host2:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); getTopoCoord().updateTerm(1, Date_t()); Status result = getTopoCoord().becomeCandidateIfElectable( @@ -4057,12 +4170,14 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverHeartbeatSaysPrimaryCaughtUp) { getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host3:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host2:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); getTopoCoord().updateTerm(1, Date_t()); Status result = getTopoCoord().becomeCandidateIfElectable( @@ -4115,13 +4230,15 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverIfTermNumbersSayPrimaryCaughtUp getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host3:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); hbResp.setAppliedOpTime(behindOptime); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host2:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); getTopoCoord().updateTerm(1, Date_t()); Status result = getTopoCoord().becomeCandidateIfElectable( @@ -5590,7 +5707,8 @@ TEST_F(HeartbeatResponseTestV1, NodeDoesNotRetryHeartbeatIfTheFirstFailureTakesT // no retry allowed. Milliseconds(4990), // Spent 4.99 of the 5 seconds in the network. target, - StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long")); + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5690,8 +5808,12 @@ public: Date_t _upRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T12:55Z")); std::pair<ReplSetHeartbeatArgsV1, Milliseconds> uppingRequest = getTopoCoord().prepareHeartbeatRequestV1(_upRequestDate, "rs0", _target); - HeartbeatResponseAction upAction = getTopoCoord().processHeartbeatResponse( - _upRequestDate, Milliseconds(0), _target, makeStatusWith<ReplSetHeartbeatResponse>()); + HeartbeatResponseAction upAction = + getTopoCoord().processHeartbeatResponse(_upRequestDate, + Milliseconds(0), + _target, + makeStatusWith<ReplSetHeartbeatResponse>(), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, upAction.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5710,8 +5832,8 @@ public: _firstRequestDate + Seconds(4), // 4 seconds elapsed, retry allowed. Milliseconds(3990), // Spent 3.99 of the 4 seconds in the network. _target, - StatusWith<ReplSetHeartbeatResponse>( - ErrorCodes::ExceededTimeLimit, "Took too long")); // We've never applied anything. + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"), + OpTime()); // We've never applied anything. ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5768,8 +5890,8 @@ TEST_F(HeartbeatResponseTestOneRetryV1, // no retry allowed. Milliseconds(1000), // Spent 1 of the 1.01 seconds in the network. target(), - StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, - "Took too long")); // We've never applied anything. + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"), + OpTime()); // We've never applied anything. ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5789,7 +5911,8 @@ public: // could retry. Milliseconds(400), // Spent 0.4 of the 0.5 seconds in the network. target(), - StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?")); + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); // Because the first retry failed without timing out, we expect to retry immediately. @@ -5836,7 +5959,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, NodeDoesNotRetryHeartbeatsAfterFailing // could still retry. Milliseconds(100), // Spent 0.1 of the 0.3 seconds in the network. target(), - StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?")); + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); // Because this is the second retry, rather than retry again, we expect to wait for a quarter @@ -5877,7 +6001,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures) getTopoCoord().processHeartbeatResponse(firstRequestDate() + Milliseconds(4500), Milliseconds(400), target(), - StatusWith<ReplSetHeartbeatResponse>(response)); + StatusWith<ReplSetHeartbeatResponse>(response), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5894,7 +6019,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures) firstRequestDate() + Milliseconds(7100), Milliseconds(400), target(), - StatusWith<ReplSetHeartbeatResponse>(Status{ErrorCodes::HostUnreachable, ""})); + StatusWith<ReplSetHeartbeatResponse>(Status{ErrorCodes::HostUnreachable, ""}), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5950,7 +6076,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1, UpdateHeartbeatDataOldConfig) { now()++, // Time is left. Milliseconds(400), // Spent 0.4 of the 0.5 second in the network. HostAndPort("host2"), - StatusWith<ReplSetHeartbeatResponse>(believesWeAreDownResponse)); + StatusWith<ReplSetHeartbeatResponse>(believesWeAreDownResponse), + OpTime()); stopCapturingLogMessages(); ASSERT_NO_ACTION(action.getAction()); ASSERT_EQUALS(1, countLogLinesContaining("host2:27017 thinks that we are down")); @@ -5999,7 +6126,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1, UpdateHeartbeatDataSameConfig) { now()++, // Time is left. Milliseconds(400), // Spent 0.4 of the 0.5 second in the network. HostAndPort("host2"), - StatusWith<ReplSetHeartbeatResponse>(sameConfigResponse)); + StatusWith<ReplSetHeartbeatResponse>(sameConfigResponse), + OpTime()); stopCapturingLogMessages(); ASSERT_NO_ACTION(action.getAction()); ASSERT_EQUALS(1, @@ -6027,7 +6155,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1, now()++, // Time is left. Milliseconds(400), // Spent 0.4 of the 0.5 second in the network. HostAndPort("host5"), - StatusWith<ReplSetHeartbeatResponse>(memberMissingResponse)); + StatusWith<ReplSetHeartbeatResponse>(memberMissingResponse), + OpTime()); stopCapturingLogMessages(); ASSERT_NO_ACTION(action.getAction()); ASSERT_EQUALS(1, countLogLinesContaining("Could not find host5:27017 in current config")); @@ -6053,7 +6182,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1, now()++, // Time is left. Milliseconds(400), // Spent 0.4 of the 0.5 second in the network. HostAndPort("host2"), - StatusWith<ReplSetHeartbeatResponse>(believesWeAreDownResponse)); + StatusWith<ReplSetHeartbeatResponse>(believesWeAreDownResponse), + OpTime()); stopCapturingLogMessages(); ASSERT_NO_ACTION(action.getAction()); ASSERT_EQUALS(1, countLogLinesContaining("host2:27017 thinks that we are down")); diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index e3ced4ac2e8..3a43b9c41f0 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -1236,17 +1236,10 @@ var ReplSetTest = function(opts) { let master = rst.getPrimary(); let id = tojson(rst.nodeList()); - // Algorithm precondition: All nodes must be in primary/secondary state. - // - // 1) Perform a majority write. This will guarantee the primary updates its commit point - // to the value of this write. - // - // 2) Perform a second write. This will guarantee that all nodes will update their commit - // point to a time that is >= the previous write. That will trigger a stable checkpoint - // on all nodes. - // TODO(SERVER-33248): Remove this block. We should not need to prod the replica set to - // advance the commit point if the commit point being lagged is sufficient to choose a - // sync source. + // All nodes must be in primary/secondary state prior to this point. Perform a majority + // write to ensure there is a committed operation on the set. The commit point will + // propagate to all members and trigger a stable checkpoint on all persisted storage engines + // nodes. function advanceCommitPoint(master) { // Shadow 'db' so that we can call 'advanceCommitPoint' directly on the primary node. let db = master.getDB('admin'); @@ -1256,6 +1249,10 @@ var ReplSetTest = function(opts) { "data": {"awaitLastStableCheckpointTimestamp": 1}, "writeConcern": {"w": "majority", "wtimeout": ReplSetTest.kDefaultTimeoutMS} })); + + // TODO(SERVER-36758): Remove the second write. We should not need to prod the + // replica set to advance the commit point once all nodes are running a version with + // SERVER-33248. This can be removed once the last stable version includes the fix. assert.commandWorked(db.adminCommand( {"appendOplogNote": 1, "data": {"awaitLastStableCheckpointTimestamp": 2}})); }; |