diff options
author | Maria van Keulen <maria@mongodb.com> | 2019-03-20 20:29:38 -0400 |
---|---|---|
committer | Maria van Keulen <maria@mongodb.com> | 2019-04-16 14:35:34 -0400 |
commit | a738647a7281a892f1b35fba8b4e1cdf47bccf56 (patch) | |
tree | c76b5fdf490be227bfa7369285fa61a6255e08e5 /src | |
parent | 89ef031427db9efd6ab657de16fd6e0c17cc7797 (diff) | |
download | mongo-a738647a7281a892f1b35fba8b4e1cdf47bccf56.tar.gz |
SERVER-40078 Report lastCommitted wall clock time
Diffstat (limited to 'src')
45 files changed, 1574 insertions, 937 deletions
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp index 7d39509d971..801548b6686 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp @@ -112,6 +112,7 @@ void AbstractOplogFetcherTest::setUp() { launchExecutorThread(); lastFetched = {{123, 0}, 1}; + lastFetchedWall = Date_t::min() + Seconds(lastFetched.getSecs()); } executor::RemoteCommandRequest AbstractOplogFetcherTest::processNetworkResponse( diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h index 9d1ec170ec8..2164f93cac6 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h @@ -94,6 +94,7 @@ protected: // The last OpTime fetched by the oplog fetcher. OpTime lastFetched; + Date_t lastFetchedWall; }; } // namespace repl } // namespace mango diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index ec0ea421550..9c8b9eefe12 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/scatter_gather_algorithm.h" #include "mongo/db/repl/scatter_gather_runner.h" #include "mongo/db/server_options.h" +#include "mongo/db/server_options.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/log.h" #include "mongo/util/str.h" @@ -195,7 +196,7 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque BSONObj resBSON = response.data; ReplSetHeartbeatResponse hbResp; - Status hbStatus = hbResp.initialize(resBSON, 0); + Status hbStatus = hbResp.initialize(resBSON, 0, /*requireWallTime*/ false); if (hbStatus.code() == ErrorCodes::InconsistentReplicaSetNames) { std::string message = str::stream() << "Our set name did not match that of " @@ -226,7 +227,7 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque if (_rsConfig->hasReplicaSetId()) { StatusWith<rpc::ReplSetMetadata> replMetadata = - rpc::ReplSetMetadata::readFromMetadata(response.data); + rpc::ReplSetMetadata::readFromMetadata(response.data, /*requireWallTime*/ false); if (replMetadata.isOK() && replMetadata.getValue().getReplicaSetId().isSet() && _rsConfig->getReplicaSetId() != replMetadata.getValue().getReplicaSetId()) { std::string message = str::stream() diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp index afd4bdf79b1..36807a63c2e 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -239,8 +239,9 @@ executor::RemoteCommandResponse makeHeartbeatResponse(const ReplSetConfig& rsCon hbResp.setConfigVersion(configVersion); // The smallest valid optime in PV1. OpTime opTime(Timestamp(), 0); - hbResp.setAppliedOpTime(opTime); - hbResp.setDurableOpTime(opTime); + Date_t wallTime = Date_t::min(); + hbResp.setAppliedOpTimeAndWallTime({opTime, wallTime}); + hbResp.setDurableOpTimeAndWallTime({opTime, wallTime}); auto bob = BSONObjBuilder(hbResp.toBSON()); bob.appendElements(extraFields); return RemoteCommandResponse(bob.obj(), duration_cast<Milliseconds>(millis)); @@ -469,8 +470,9 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetIdMismatch) { << request.target.toString(); if (request.target == incompatibleHost) { OpTime opTime{Timestamp{10, 10}, 10}; + Date_t wallTime = Date_t::min(); rpc::ReplSetMetadata metadata(opTime.getTerm(), - opTime, + {opTime, wallTime}, opTime, rsConfig.getConfigVersion(), unexpectedId, diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index c1075aa9dbb..cba04681a31 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -79,8 +79,7 @@ OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOp void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& replMetadata, rpc::OplogQueryMetadata oqMetadata) { - OpTime newCommitPoint; - newCommitPoint = oqMetadata.getLastOpCommitted(); + OpTimeAndWallTime newCommitPoint = oqMetadata.getLastOpCommitted(); const bool fromSyncSource = true; _replicationCoordinator->advanceCommitPoint(newCommitPoint, fromSyncSource); diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index ac82555ff60..2eb1d27e5ef 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -518,7 +518,8 @@ RemoteCommandResponse makeCursorResponse(CursorId cursorId, bool isFirstBatch = true, int rbid = 1) { OpTime futureOpTime(Timestamp(1000, 1000), 1000); - rpc::OplogQueryMetadata oqMetadata(futureOpTime, futureOpTime, rbid, 0, 0); + Date_t futureWallTime = Date_t::min() + Seconds(futureOpTime.getSecs()); + rpc::OplogQueryMetadata oqMetadata({futureOpTime, futureWallTime}, futureOpTime, rbid, 0, 0); BSONObjBuilder bob; { diff --git a/src/mongo/db/repl/member_data.cpp b/src/mongo/db/repl/member_data.cpp index 18cb910496e..197376cd15c 100644 --- a/src/mongo/db/repl/member_data.cpp +++ b/src/mongo/db/repl/member_data.cpp @@ -43,7 +43,7 @@ namespace repl { MemberData::MemberData() : _health(-1), _authIssue(false), _configIndex(-1), _isSelf(false) { _lastResponse.setState(MemberState::RS_UNKNOWN); _lastResponse.setElectionTime(Timestamp()); - _lastResponse.setAppliedOpTime(OpTime()); + _lastResponse.setAppliedOpTimeAndWallTime(OpTimeAndWallTime()); } bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) { @@ -65,7 +65,7 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) hbResponse.setElectionTime(_lastResponse.getElectionTime()); } if (!hbResponse.hasAppliedOpTime()) { - hbResponse.setAppliedOpTime(_lastResponse.getAppliedOpTime()); + hbResponse.setAppliedOpTimeAndWallTime(_lastResponse.getAppliedOpTimeAndWallTime()); } // Log if the state changes if (_lastResponse.getState() != hbResponse.getState()) { @@ -73,9 +73,13 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) << hbResponse.getState().toString() << rsLog; } - bool opTimeAdvanced = advanceLastAppliedOpTime(hbResponse.getAppliedOpTime(), now); - auto durableOpTime = hbResponse.hasDurableOpTime() ? hbResponse.getDurableOpTime() : OpTime(); - opTimeAdvanced = advanceLastDurableOpTime(durableOpTime, now) || opTimeAdvanced; + bool opTimeAdvanced = + advanceLastAppliedOpTimeAndWallTime(hbResponse.getAppliedOpTimeAndWallTime(), now); + auto durableOpTimeAndWallTime = hbResponse.hasDurableOpTime() + ? hbResponse.getDurableOpTimeAndWallTime() + : OpTimeAndWallTime(); + opTimeAdvanced = + advanceLastDurableOpTimeAndWallTime(durableOpTimeAndWallTime, now) || opTimeAdvanced; _lastResponse = std::move(hbResponse); return opTimeAdvanced; } @@ -95,7 +99,7 @@ void MemberData::setDownValues(Date_t now, const std::string& heartbeatMessage) _lastResponse = ReplSetHeartbeatResponse(); _lastResponse.setState(MemberState::RS_DOWN); _lastResponse.setElectionTime(Timestamp()); - _lastResponse.setAppliedOpTime(OpTime()); + _lastResponse.setAppliedOpTimeAndWallTime(OpTimeAndWallTime()); _lastResponse.setSyncingTo(HostAndPort()); // The _lastAppliedOpTime/_lastDurableOpTime fields don't get cleared merely by missing a @@ -118,7 +122,7 @@ void MemberData::setAuthIssue(Date_t now) { _lastResponse = ReplSetHeartbeatResponse(); _lastResponse.setState(MemberState::RS_UNKNOWN); _lastResponse.setElectionTime(Timestamp()); - _lastResponse.setAppliedOpTime(OpTime()); + _lastResponse.setAppliedOpTimeAndWallTime(OpTimeAndWallTime()); _lastResponse.setSyncingTo(HostAndPort()); } @@ -129,10 +133,6 @@ void MemberData::setLastAppliedOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_ _lastAppliedWallTime = opTime.wallTime; } -void MemberData::setLastAppliedOpTime(OpTime opTime, Date_t now) { - setLastAppliedOpTimeAndWallTime({opTime, Date_t::min()}, now); -} - void MemberData::setLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now) { _lastUpdate = now; _lastUpdateStale = false; @@ -150,10 +150,6 @@ void MemberData::setLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_ } } -void MemberData::setLastDurableOpTime(OpTime opTime, Date_t now) { - setLastDurableOpTimeAndWallTime({opTime, Date_t::min()}, now); -} - bool MemberData::advanceLastAppliedOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now) { _lastUpdate = now; _lastUpdateStale = false; @@ -164,10 +160,6 @@ bool MemberData::advanceLastAppliedOpTimeAndWallTime(OpTimeAndWallTime opTime, D return false; } -bool MemberData::advanceLastAppliedOpTime(OpTime opTime, Date_t now) { - return advanceLastAppliedOpTimeAndWallTime({opTime, Date_t::min()}, now); -} - bool MemberData::advanceLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now) { _lastUpdate = now; _lastUpdateStale = false; @@ -179,9 +171,5 @@ bool MemberData::advanceLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, D return false; } -bool MemberData::advanceLastDurableOpTime(OpTime opTime, Date_t now) { - return advanceLastDurableOpTimeAndWallTime({opTime, Date_t::min()}, now); -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/member_data.h b/src/mongo/db/repl/member_data.h index cae0e30b4c9..d8887317f83 100644 --- a/src/mongo/db/repl/member_data.h +++ b/src/mongo/db/repl/member_data.h @@ -172,37 +172,21 @@ public: } /** - * Sets the last applied op time (not the heartbeat applied op time) and updates the - * lastUpdate time. - */ - void setLastAppliedOpTime(OpTime opTime, Date_t now); - - /** * Performs setLastAppliedOpTime and also sets the wall clock time corresponding to the last * applied opTime. Should only be used on the current node. */ void setLastAppliedOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now); /** - * Sets the last durable op time (not the heartbeat durable op time) - */ - void setLastDurableOpTime(OpTime opTime, Date_t now); - - /** * Performs setLastDurableOpTime and also sets the wall clock time corresponding to the last * durable opTime. Should only be used on the current node. */ void setLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now); - /** * Sets the last applied op time (not the heartbeat applied op time) iff the new optime is * later than the current optime, and updates the lastUpdate time. Returns true if the * optime was advanced. - */ - bool advanceLastAppliedOpTime(OpTime opTime, Date_t now); - - /** * Performs advanceLastAppliedOpTime and also sets the wall clock time corresponding to the last * applied opTime. Should only be used on the current node. */ @@ -212,10 +196,6 @@ public: * Sets the last durable op time (not the heartbeat applied op time) iff the new optime is * later than the current optime, and updates the lastUpdate time. Returns true if the * optime was advanced. - */ - bool advanceLastDurableOpTime(OpTime opTime, Date_t now); - - /** * Performs advanceLastDurableOpTime and also sets the wall clock time corresponding to the last * durable opTime. Should only be used on the current node. */ diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 84ebd79b4da..a432da62bb4 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -229,7 +229,19 @@ StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata( queryResponse.otherFields.metadata.hasElement(rpc::kOplogQueryMetadataFieldName); if (receivedOplogQueryMetadata) { const auto& metadataObj = queryResponse.otherFields.metadata; - auto metadataResult = rpc::OplogQueryMetadata::readFromMetadata(metadataObj); + // Wall clock times are required in OplogQueryMetadata when FCV is 4.2. Arbiters trivially + // have FCV equal to 4.2, so they are excluded from this check. + bool isArbiter = hasGlobalServiceContext() && + repl::ReplicationCoordinator::get(getGlobalServiceContext()) && + repl::ReplicationCoordinator::get(getGlobalServiceContext())->getMemberState() == + MemberState::RS_ARBITER; + bool requireWallTime = + (serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 && + !isArbiter); + auto metadataResult = + rpc::OplogQueryMetadata::readFromMetadata(metadataObj, requireWallTime); if (!metadataResult.isOK()) { return metadataResult.getStatus(); } @@ -469,7 +481,18 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); if (receivedReplMetadata) { const auto& metadataObj = queryResponse.otherFields.metadata; - auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj); + // Wall clock times are required in ReplSetMetadata when FCV is 4.2. Arbiters trivially + // have FCV equal to 4.2, so they are excluded from this check. + bool isArbiter = hasGlobalServiceContext() && + repl::ReplicationCoordinator::get(getGlobalServiceContext()) && + repl::ReplicationCoordinator::get(getGlobalServiceContext())->getMemberState() == + MemberState::RS_ARBITER; + bool requireWallTime = + (serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 && + !isArbiter); + auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj, requireWallTime); if (!metadataResult.isOK()) { error() << "invalid replication metadata from sync source " << _getSource() << ": " << metadataResult.getStatus() << ": " << metadataObj; diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 532821b3cd8..233735db58e 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -94,6 +94,7 @@ protected: OpTime remoteNewerOpTime; OpTime staleOpTime; + Date_t staleWallTime; int rbid; std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState; @@ -110,6 +111,7 @@ void OplogFetcherTest::setUp() { remoteNewerOpTime = {{124, 1}, 2}; staleOpTime = {{1, 1}, 0}; + staleWallTime = Date_t::min() + Seconds(staleOpTime.getSecs()); rbid = 2; dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>(); @@ -130,7 +132,7 @@ BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime, int primaryIndex, int syncSourceIndex) { rpc::OplogQueryMetadata oqMetadata( - staleOpTime, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex); + {staleOpTime, staleWallTime}, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex); BSONObjBuilder bob; ASSERT_OK(oqMetadata.writeToMetadata(&bob)); return bob.obj(); @@ -284,7 +286,8 @@ TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher DEATH_TEST_F(OplogFetcherTest, ValidMetadataInResponseWithoutOplogMetadataInvariants, "Invariant failure oqMetadata") { - rpc::ReplSetMetadata metadata(1, lastFetched, lastFetched, 1, OID::gen(), 2, 2); + rpc::ReplSetMetadata metadata( + 1, {lastFetched, lastFetchedWall}, lastFetched, 1, OID::gen(), 2, 2); BSONObjBuilder bob; ASSERT_OK(metadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); @@ -295,8 +298,9 @@ DEATH_TEST_F(OplogFetcherTest, } TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) { - rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid, 2, 2); + rpc::ReplSetMetadata replMetadata( + 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -314,8 +318,10 @@ TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMe } TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) { - rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid + 1, 2, 2); + rpc::ReplSetMetadata replMetadata( + 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata( + {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -332,8 +338,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) } TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) { - rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2); + rpc::ReplSetMetadata replMetadata( + 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -350,8 +357,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) } TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) { - rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched, rbid, 2, 2); + rpc::ReplSetMetadata replMetadata( + 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -369,8 +377,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) { - rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2); + rpc::ReplSetMetadata replMetadata( + 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -390,8 +399,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM // This tests the case where the sync source metadata is behind us but we get a document which // is equal to us. Since that means the metadata is stale and can be ignored, we should accept // this sync source. - rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2); + rpc::ReplSetMetadata replMetadata( + 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -406,8 +416,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) { - rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched, rbid, 2, 2); + rpc::ReplSetMetadata replMetadata( + 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -422,7 +433,8 @@ TEST_F(OplogFetcherTest, TEST_F(OplogFetcherTest, MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) { - rpc::ReplSetMetadata metadata(1, lastFetched, lastFetched, 1, OID::gen(), 2, 2); + rpc::ReplSetMetadata metadata( + 1, {lastFetched, lastFetchedWall}, lastFetched, 1, OID::gen(), 2, 2); BSONObjBuilder bob; ASSERT_OK(metadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); @@ -436,8 +448,9 @@ TEST_F(OplogFetcherTest, } TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { - rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid, 2, 2); + rpc::ReplSetMetadata replMetadata( + 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -757,9 +770,14 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetche TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) { rpc::ReplSetMetadata replMetadata( - lastFetched.getTerm(), OpTime(), OpTime(), 1, OID::gen(), -1, -1); + lastFetched.getTerm(), {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1); + OpTime committedOpTime = {{Seconds(10000), 0}, 1}; rpc::OplogQueryMetadata oqMetadata( - {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, rbid, 2, 2); + {committedOpTime, Date_t::min() + Seconds(committedOpTime.getSecs())}, + {{Seconds(20000), 0}, 1}, + rbid, + 2, + 2); testSyncSourceChecking(&replMetadata, &oqMetadata); @@ -771,15 +789,21 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) { - rpc::ReplSetMetadata replMetadata(lastFetched.getTerm(), - {{Seconds(10000), 0}, 1}, - {{Seconds(20000), 0}, 1}, - 1, - OID::gen(), - 2, - 2); + OpTime committedOpTime = {{Seconds(10000), 0}, 1}; + rpc::ReplSetMetadata replMetadata( + lastFetched.getTerm(), + {committedOpTime, Date_t::min() + Seconds(committedOpTime.getSecs())}, + {{Seconds(20000), 0}, 1}, + 1, + OID::gen(), + 2, + 2); rpc::OplogQueryMetadata oqMetadata( - {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, rbid, 2, -1); + {committedOpTime, Date_t::min() + Seconds(committedOpTime.getSecs())}, + {{Seconds(20000), 0}, 1}, + rbid, + 2, + -1); testSyncSourceChecking(&replMetadata, &oqMetadata); diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h index c1b377f2845..8edb4ccde96 100644 --- a/src/mongo/db/repl/optime.h +++ b/src/mongo/db/repl/optime.h @@ -162,7 +162,17 @@ private: struct OpTimeAndWallTime { OpTime opTime; - Date_t wallTime; + Date_t wallTime = Date_t::min(); + inline bool operator==(const OpTimeAndWallTime& rhs) const { + return opTime == rhs.opTime && wallTime == rhs.wallTime; + } + inline bool operator<(const OpTimeAndWallTime& rhs) const { + // Wall clock time ordering should not matter for calculations of the commit point. + return opTime < rhs.opTime; + } + std::string toString() const { + return opTime.toString() + ", " + wallTime.toString(); + } }; std::ostream& operator<<(std::ostream& out, const OpTimeAndWallTime& opTime); } // namespace repl diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index e347a80f7da..6fdb826d85b 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -578,7 +578,16 @@ public: if (cmdObj.hasField("handshake")) return true; - auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(cmdObj); + // Wall clock times are required in ReplSetMetadata when FCV is 4.2. Arbiters trivially + // have FCV equal to 4.2, so they are excluded from this check. + bool isArbiter = replCoord->getMemberState() == MemberState::RS_ARBITER; + bool requireWallTime = + (serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 && + !isArbiter); + + auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(cmdObj, requireWallTime); if (metadataResult.isOK()) { // New style update position command has metadata, which may inform the // upstream of a higher term. @@ -592,7 +601,13 @@ public: UpdatePositionArgs args; - status = args.initialize(cmdObj); + // re-check requireWallTime + requireWallTime = + (serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 && + !isArbiter); + status = args.initialize(cmdObj, requireWallTime); if (status.isOK()) { status = replCoord->processReplSetUpdatePosition(args, &configVersion); diff --git a/src/mongo/db/repl/repl_set_heartbeat_response.cpp b/src/mongo/db/repl/repl_set_heartbeat_response.cpp index a312970aad3..2c1f5f88734 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_response.cpp +++ b/src/mongo/db/repl/repl_set_heartbeat_response.cpp @@ -39,6 +39,7 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/bson_extract_optime.h" +#include "mongo/db/server_options.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -54,7 +55,9 @@ const std::string kElectionTimeFieldName = "electionTime"; const std::string kMemberStateFieldName = "state"; const std::string kOkFieldName = "ok"; const std::string kDurableOpTimeFieldName = "durableOpTime"; +const std::string kDurableWallTimeFieldName = "durableWallTime"; const std::string kAppliedOpTimeFieldName = "opTime"; +const std::string kAppliedWallTimeFieldName = "wallTime"; const std::string kPrimaryIdFieldName = "primaryId"; const std::string kReplSetFieldName = "set"; const std::string kSyncSourceFieldName = "syncingTo"; @@ -92,9 +95,11 @@ void ReplSetHeartbeatResponse::addToBSON(BSONObjBuilder* builder) const { } if (_durableOpTimeSet) { _durableOpTime.append(builder, kDurableOpTimeFieldName); + builder->appendDate(kDurableWallTimeFieldName, _durableWallTime); } if (_appliedOpTimeSet) { _appliedOpTime.append(builder, kAppliedOpTimeFieldName); + builder->appendDate(kAppliedWallTimeFieldName, _appliedWallTime); } } @@ -104,7 +109,9 @@ BSONObj ReplSetHeartbeatResponse::toBSON() const { return builder.obj(); } -Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc, long long term) { +Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc, + long long term, + bool requireWallTime) { auto status = getStatusFromCommandResult(doc); if (!status.isOK()) { return status; @@ -146,13 +153,40 @@ Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc, long long term) if (!status.isOK()) { return status; } + + BSONElement durableWallTimeElement; + _durableWallTime = Date_t::min(); + status = bsonExtractTypedField( + doc, kDurableWallTimeFieldName, BSONType::Date, &durableWallTimeElement); + if (!status.isOK() && (status != ErrorCodes::NoSuchKey || requireWallTime)) { + // We ignore NoSuchKey errors if the FeatureCompatibilityVersion is less than 4.2, since + // older version nodes may not report wall clock times. + return status; + } + if (status.isOK()) { + _durableWallTime = durableWallTimeElement.Date(); + } _durableOpTimeSet = true; + // In V1, heartbeats OpTime is type Object and we construct an OpTime out of its nested fields. status = bsonExtractOpTimeField(doc, kAppliedOpTimeFieldName, &_appliedOpTime); if (!status.isOK()) { return status; } + + BSONElement appliedWallTimeElement; + _appliedWallTime = Date_t::min(); + status = bsonExtractTypedField( + doc, kAppliedWallTimeFieldName, BSONType::Date, &appliedWallTimeElement); + if (!status.isOK() && (status != ErrorCodes::NoSuchKey || requireWallTime)) { + // We ignore NoSuchKey errors if the FeatureCompatibilityVersion is less than 4.2, since + // older version nodes may not report wall clock times. + return status; + } + if (status.isOK()) { + _appliedWallTime = appliedWallTimeElement.Date(); + } _appliedOpTimeSet = true; const BSONElement memberStateElement = doc[kMemberStateFieldName]; @@ -250,10 +284,20 @@ OpTime ReplSetHeartbeatResponse::getAppliedOpTime() const { return _appliedOpTime; } +OpTimeAndWallTime ReplSetHeartbeatResponse::getAppliedOpTimeAndWallTime() const { + invariant(_appliedOpTimeSet); + return {_appliedOpTime, _appliedWallTime}; +} + OpTime ReplSetHeartbeatResponse::getDurableOpTime() const { invariant(_durableOpTimeSet); return _durableOpTime; } +OpTimeAndWallTime ReplSetHeartbeatResponse::getDurableOpTimeAndWallTime() const { + invariant(_durableOpTimeSet); + return {_durableOpTime, _durableWallTime}; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_heartbeat_response.h b/src/mongo/db/repl/repl_set_heartbeat_response.h index 2c5fb91538f..3b883d4f017 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_response.h +++ b/src/mongo/db/repl/repl_set_heartbeat_response.h @@ -52,8 +52,9 @@ public: /** * Initializes this ReplSetHeartbeatResponse from the contents of "doc". * "term" is only used to complete a V0 OpTime (which is really a Timestamp). + * Only processes wall clock time elements if FCV is 4.2 (i.e., requireWallTime is true). */ - Status initialize(const BSONObj& doc, long long term); + Status initialize(const BSONObj& doc, long long term, bool requireWallTime); /** * Appends all non-default values to "builder". @@ -104,10 +105,12 @@ public: return _appliedOpTimeSet; } OpTime getAppliedOpTime() const; + OpTimeAndWallTime getAppliedOpTimeAndWallTime() const; bool hasDurableOpTime() const { return _durableOpTimeSet; } OpTime getDurableOpTime() const; + OpTimeAndWallTime getDurableOpTimeAndWallTime() const; /** * Sets _setName to "name". @@ -158,13 +161,15 @@ public: _primaryIdSet = true; _primaryId = primaryId; } - void setAppliedOpTime(OpTime time) { + void setAppliedOpTimeAndWallTime(OpTimeAndWallTime time) { _appliedOpTimeSet = true; - _appliedOpTime = time; + _appliedOpTime = time.opTime; + _appliedWallTime = time.wallTime; } - void setDurableOpTime(OpTime time) { + void setDurableOpTimeAndWallTime(OpTimeAndWallTime time) { _durableOpTimeSet = true; - _durableOpTime = time; + _durableOpTime = time.opTime; + _durableWallTime = time.wallTime; } void setTerm(long long term) { _term = term; @@ -176,9 +181,11 @@ private: bool _appliedOpTimeSet = false; OpTime _appliedOpTime; + Date_t _appliedWallTime; bool _durableOpTimeSet = false; OpTime _durableOpTime; + Date_t _durableWallTime; bool _stateSet = false; MemberState _state; diff --git a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp index 5c5ec8df7e3..72c7d31190a 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp +++ b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp @@ -48,6 +48,10 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) { int fieldsSet = 1; ReplSetHeartbeatResponse hbResponse; ReplSetHeartbeatResponse hbResponseObjRoundTripChecker; + OpTime durableOpTime = OpTime(Timestamp(10), 0); + Date_t durableWallTime = Date_t::min() + Seconds(durableOpTime.getSecs()); + OpTime appliedOpTime = OpTime(Timestamp(50), 0); + Date_t appliedWallTime = Date_t::min() + Seconds(appliedOpTime.getSecs()); ASSERT_EQUALS(false, hbResponse.hasState()); ASSERT_EQUALS(false, hbResponse.hasElectionTime()); ASSERT_EQUALS(false, hbResponse.hasDurableOpTime()); @@ -73,11 +77,11 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) { hbResponse.setElectionTime(Timestamp(10, 0)); ++fieldsSet; // set durableOpTime - hbResponse.setDurableOpTime(OpTime(Timestamp(10), 0)); - ++fieldsSet; + hbResponse.setDurableOpTimeAndWallTime({durableOpTime, durableWallTime}); + fieldsSet += 2; // OpTime and WallTime are separate fields // set appliedOpTime - hbResponse.setAppliedOpTime(OpTime(Timestamp(50), 0)); - ++fieldsSet; + hbResponse.setAppliedOpTimeAndWallTime({appliedOpTime, appliedWallTime}); + fieldsSet += 2; // OpTime and WallTime are separate fields // set config ReplSetConfig config; hbResponse.setConfig(config); @@ -99,8 +103,10 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) { ASSERT_EQUALS(HostAndPort("syncTarget"), hbResponse.getSyncingTo()); ASSERT_EQUALS(1, hbResponse.getConfigVersion()); ASSERT_EQUALS(Timestamp(10, 0), hbResponse.getElectionTime()); - ASSERT_EQUALS(OpTime(Timestamp(0, 10), 0), hbResponse.getDurableOpTime()); - ASSERT_EQUALS(OpTime(Timestamp(0, 50), 0), hbResponse.getAppliedOpTime()); + ASSERT_EQUALS(durableOpTime, hbResponse.getDurableOpTime()); + ASSERT_EQUALS(durableWallTime, hbResponse.getDurableOpTimeAndWallTime().wallTime); + ASSERT_EQUALS(appliedOpTime, hbResponse.getAppliedOpTime()); + ASSERT_EQUALS(appliedWallTime, hbResponse.getAppliedOpTimeAndWallTime().wallTime); ASSERT_EQUALS(config.toBSON().toString(), hbResponse.getConfig().toBSON().toString()); hbResponseObj = hbResponse.toBSON(); @@ -114,7 +120,8 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) { ASSERT_EQUALS(2, hbResponseObj["state"].numberLong()); ASSERT_EQUALS("syncTarget:27017", hbResponseObj["syncingTo"].String()); - initializeResult = hbResponseObjRoundTripChecker.initialize(hbResponseObj, 0); + initializeResult = + hbResponseObjRoundTripChecker.initialize(hbResponseObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(Status::OK(), initializeResult); ASSERT_EQUALS(hbResponseObj.toString(), hbResponseObjRoundTripChecker.toBSON().toString()); } @@ -123,7 +130,7 @@ TEST(ReplSetHeartbeatResponse, InitializeWrongElectionTimeType) { ReplSetHeartbeatResponse hbResponse; BSONObj initializerObj = BSON("ok" << 1.0 << "electionTime" << "hello"); - Status result = hbResponse.initialize(initializerObj, 0); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); ASSERT_EQUALS( "Expected \"electionTime\" field in response to replSetHeartbeat command to " @@ -135,44 +142,74 @@ TEST(ReplSetHeartbeatResponse, InitializeWrongDurableOpTimeType) { ReplSetHeartbeatResponse hbResponse; BSONObj initializerObj = BSON("ok" << 1.0 << "durableOpTime" << "hello"); - Status result = hbResponse.initialize(initializerObj, 0); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); ASSERT_EQUALS("\"durableOpTime\" had the wrong type. Expected object, found string", result.reason()); BSONObj initializerObj2 = BSON("ok" << 1.0 << "durableOpTime" << OpTime().getTimestamp()); - Status result2 = hbResponse.initialize(initializerObj2, 0); + Status result2 = hbResponse.initialize(initializerObj2, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result2); ASSERT_EQUALS("\"durableOpTime\" had the wrong type. Expected object, found timestamp", result2.reason()); } -TEST(ReplSetHeartbeatResponse, InitializeWrongAppliedOpTimeType) { +TEST(ReplSetHeartbeatResponse, InitializeNoDurableWallTime) { ReplSetHeartbeatResponse hbResponse; BSONObj initializerObj = BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << "hello"); - Status result = hbResponse.initialize(initializerObj, 0); + << OpTime(Timestamp(100, 0), 0).toBSON()); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, result); + ASSERT_EQUALS("Missing expected field \"durableWallTime\"", result.reason()); +} + +TEST(ReplSetHeartbeatResponse, InitializeWrongAppliedOpTimeType) { + ReplSetHeartbeatResponse hbResponse; + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << "hello"); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); ASSERT_EQUALS("\"opTime\" had the wrong type. Expected object, found string", result.reason()); - initializerObj = - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime().getTimestamp()); - result = hbResponse.initialize(initializerObj, 0); + initializerObj = BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() + << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime().getTimestamp()); + result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); ASSERT_EQUALS("\"opTime\" had the wrong type. Expected object, found timestamp", result.reason()); } +TEST(ReplSetHeartbeatResponse, InitializeNoAppliedWallTime) { + ReplSetHeartbeatResponse hbResponse; + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON()); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, result); + ASSERT_EQUALS("Missing expected field \"wallTime\"", result.reason()); +} + TEST(ReplSetHeartbeatResponse, InitializeMemberStateWrongType) { ReplSetHeartbeatResponse hbResponse; - BSONObj initializerObj = - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime(Timestamp(100, 0), 0).toBSON() - << "state" - << "hello"); - Status result = hbResponse.initialize(initializerObj, 0); + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100) + << "state" + << "hello"); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); ASSERT_EQUALS( "Expected \"state\" field in response to replSetHeartbeat command to " @@ -182,12 +219,16 @@ TEST(ReplSetHeartbeatResponse, InitializeMemberStateWrongType) { TEST(ReplSetHeartbeatResponse, InitializeMemberStateTooLow) { ReplSetHeartbeatResponse hbResponse; - BSONObj initializerObj = - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime(Timestamp(100, 0), 0).toBSON() - << "state" - << -1); - Status result = hbResponse.initialize(initializerObj, 0); + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100) + << "state" + << -1); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::BadValue, result); ASSERT_EQUALS( "Value for \"state\" in response to replSetHeartbeat is out of range; " @@ -197,12 +238,16 @@ TEST(ReplSetHeartbeatResponse, InitializeMemberStateTooLow) { TEST(ReplSetHeartbeatResponse, InitializeMemberStateTooHigh) { ReplSetHeartbeatResponse hbResponse; - BSONObj initializerObj = - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime(Timestamp(100, 0), 0).toBSON() - << "state" - << 11); - Status result = hbResponse.initialize(initializerObj, 0); + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100) + << "state" + << 11); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::BadValue, result); ASSERT_EQUALS( "Value for \"state\" in response to replSetHeartbeat is out of range; " @@ -212,12 +257,16 @@ TEST(ReplSetHeartbeatResponse, InitializeMemberStateTooHigh) { TEST(ReplSetHeartbeatResponse, InitializeVersionWrongType) { ReplSetHeartbeatResponse hbResponse; - BSONObj initializerObj = - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime(Timestamp(100, 0), 0).toBSON() - << "v" - << "hello"); - Status result = hbResponse.initialize(initializerObj, 0); + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100) + << "v" + << "hello"); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); ASSERT_EQUALS( "Expected \"v\" field in response to replSetHeartbeat to " @@ -227,14 +276,18 @@ TEST(ReplSetHeartbeatResponse, InitializeVersionWrongType) { TEST(ReplSetHeartbeatResponse, InitializeReplSetNameWrongType) { ReplSetHeartbeatResponse hbResponse; - BSONObj initializerObj = - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime(Timestamp(100, 0), 0).toBSON() - << "v" - << 2 // needs a version to get this far in initialize() - << "set" - << 4); - Status result = hbResponse.initialize(initializerObj, 0); + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100) + << "v" + << 2 // needs a version to get this far in initialize() + << "set" + << 4); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); ASSERT_EQUALS( "Expected \"set\" field in response to replSetHeartbeat to " @@ -244,14 +297,18 @@ TEST(ReplSetHeartbeatResponse, InitializeReplSetNameWrongType) { TEST(ReplSetHeartbeatResponse, InitializeSyncingToWrongType) { ReplSetHeartbeatResponse hbResponse; - BSONObj initializerObj = - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime(Timestamp(100, 0), 0).toBSON() - << "v" - << 2 // needs a version to get this far in initialize() - << "syncingTo" - << 4); - Status result = hbResponse.initialize(initializerObj, 0); + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100) + << "v" + << 2 // needs a version to get this far in initialize() + << "syncingTo" + << 4); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); ASSERT_EQUALS( "Expected \"syncingTo\" field in response to replSetHeartbeat to " @@ -261,14 +318,18 @@ TEST(ReplSetHeartbeatResponse, InitializeSyncingToWrongType) { TEST(ReplSetHeartbeatResponse, InitializeConfigWrongType) { ReplSetHeartbeatResponse hbResponse; - BSONObj initializerObj = - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime(Timestamp(100, 0), 0).toBSON() - << "v" - << 2 // needs a version to get this far in initialize() - << "config" - << 4); - Status result = hbResponse.initialize(initializerObj, 0); + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100) + << "v" + << 2 // needs a version to get this far in initialize() + << "config" + << 4); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); ASSERT_EQUALS( "Expected \"config\" in response to replSetHeartbeat to " @@ -278,14 +339,18 @@ TEST(ReplSetHeartbeatResponse, InitializeConfigWrongType) { TEST(ReplSetHeartbeatResponse, InitializeBadConfig) { ReplSetHeartbeatResponse hbResponse; - BSONObj initializerObj = - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime(Timestamp(100, 0), 0).toBSON() - << "v" - << 2 // needs a version to get this far in initialize() - << "config" - << BSON("illegalFieldName" << 2)); - Status result = hbResponse.initialize(initializerObj, 0); + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100) + << "v" + << 2 // needs a version to get this far in initialize() + << "config" + << BSON("illegalFieldName" << 2)); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::BadValue, result); ASSERT_EQUALS("Unexpected field illegalFieldName in replica set configuration", result.reason()); @@ -298,16 +363,22 @@ TEST(ReplSetHeartbeatResponse, NoConfigStillInitializing) { BSONObj initializerObj = BSON("ok" << 0.0 << "code" << ErrorCodes::NotYetInitialized << "errmsg" << "Received heartbeat while still initializing replication system."); - Status result = hbResp.initialize(initializerObj, 0); + Status result = hbResp.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::NotYetInitialized, result.code()); } TEST(ReplSetHeartbeatResponse, InvalidResponseOpTimeMissesConfigVersion) { ReplSetHeartbeatResponse hbResp; - Status result = hbResp.initialize( - BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime" - << OpTime(Timestamp(100, 0), 0).toBSON()), - 0); + Status result = hbResp.initialize(BSON("ok" << 1.0 << "durableOpTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "durableWallTime" + << Date_t::min() + Seconds(100) + << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100)), + 0, + /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.code()); ASSERT_TRUE(stringContains(result.reason(), "\"v\"")) << result.reason() << " doesn't contain 'v' field required error msg"; @@ -318,7 +389,7 @@ TEST(ReplSetHeartbeatResponse, MismatchedReplicaSetNames) { BSONObj initializerObj = BSON("ok" << 0.0 << "code" << ErrorCodes::InconsistentReplicaSetNames << "errmsg" << "replica set name doesn't match."); - Status result = hbResponse.initialize(initializerObj, 0); + Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::InconsistentReplicaSetNames, result.code()); } @@ -326,7 +397,9 @@ TEST(ReplSetHeartbeatResponse, AuthFailure) { ReplSetHeartbeatResponse hbResp; std::string errMsg = "Unauthorized"; Status result = hbResp.initialize( - BSON("ok" << 0.0 << "errmsg" << errMsg << "code" << ErrorCodes::Unauthorized), 0); + BSON("ok" << 0.0 << "errmsg" << errMsg << "code" << ErrorCodes::Unauthorized), + 0, + /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::Unauthorized, result.code()); ASSERT_EQUALS(errMsg, result.reason()); } @@ -334,7 +407,8 @@ TEST(ReplSetHeartbeatResponse, AuthFailure) { TEST(ReplSetHeartbeatResponse, ServerError) { ReplSetHeartbeatResponse hbResp; std::string errMsg = "Random Error"; - Status result = hbResp.initialize(BSON("ok" << 0.0 << "errmsg" << errMsg), 0); + Status result = + hbResp.initialize(BSON("ok" << 0.0 << "errmsg" << errMsg), 0, /*requireWallTime*/ true); ASSERT_EQUALS(ErrorCodes::UnknownError, result.code()); ASSERT_EQUALS(errMsg, result.reason()); } diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 31a54cbb470..47be83249f7 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -620,7 +620,8 @@ public: * the same branch of history as 'committedOptime', so we update our commit point to * min(committedOptime, lastApplied). */ - virtual void advanceCommitPoint(const OpTime& committedOptime, bool fromSyncSource) = 0; + virtual void advanceCommitPoint(const OpTimeAndWallTime& committedOpTimeAndWallTime, + bool fromSyncSource) = 0; /** * Elections under protocol version 1 are triggered by a timer. @@ -755,6 +756,7 @@ public: * operation in their oplogs. This implies such ops will never be rolled back. */ virtual OpTime getLastCommittedOpTime() const = 0; + virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const = 0; /** * Returns a list of objects that contain this node's knowledge of the state of the members of diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f5bede64b57..6670e7c6ec6 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1045,7 +1045,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, // Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged // our election in onTransitionToPrimary(), above. - _updateLastCommittedOpTime(lk); + _updateLastCommittedOpTimeAndWallTime(lk); // Update _canAcceptNonLocalWrites _updateMemberStateFromTopologyCoordinator(lk, opCtx); @@ -1187,7 +1187,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTime( opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed); // If we are using applied times to calculate the commit level, update it now. if (!_rsConfig.getWriteConcernMajorityShouldJournal()) { - _updateLastCommittedOpTime(lk); + _updateLastCommittedOpTimeAndWallTime(lk); } // Signal anyone waiting on optime changes. @@ -1237,7 +1237,7 @@ void ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTime( opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed); // If we are using durable times to calculate the commit level, update it now. if (_rsConfig.getWriteConcernMajorityShouldJournal()) { - _updateLastCommittedOpTime(lk); + _updateLastCommittedOpTimeAndWallTime(lk); } } @@ -1523,10 +1523,15 @@ Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer stdx::lock_guard<stdx::mutex> lock(_mutex); invariant(getReplicationMode() == modeReplSet); - const UpdatePositionArgs::UpdateInfo update(OpTime(), opTime, cfgVer, memberId); + const UpdatePositionArgs::UpdateInfo update(OpTime(), + Date_t::min(), + opTime, + Date_t::min() + Seconds(opTime.getSecs()), + cfgVer, + memberId); long long configVersion; const auto status = _setLastOptime(lock, update, &configVersion); - _updateLastCommittedOpTime(lock); + _updateLastCommittedOpTimeAndWallTime(lock); return status; } @@ -1536,10 +1541,15 @@ Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer stdx::lock_guard<stdx::mutex> lock(_mutex); invariant(getReplicationMode() == modeReplSet); - const UpdatePositionArgs::UpdateInfo update(opTime, OpTime(), cfgVer, memberId); + const UpdatePositionArgs::UpdateInfo update(opTime, + Date_t::min() + Seconds(opTime.getSecs()), + OpTime(), + Date_t::min(), + cfgVer, + memberId); long long configVersion; const auto status = _setLastOptime(lock, update, &configVersion); - _updateLastCommittedOpTime(lock); + _updateLastCommittedOpTimeAndWallTime(lock); return status; } @@ -1552,7 +1562,7 @@ Status ReplicationCoordinatorImpl::_setLastOptime(WithLock lk, const bool advancedOpTime = result.getValue(); // Only update committed optime if the remote optimes increased. if (advancedOpTime) { - _updateLastCommittedOpTime(lk); + _updateLastCommittedOpTimeAndWallTime(lk); } _cancelAndRescheduleLivenessUpdate_inlock(args.memberId); @@ -3092,7 +3102,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk, // nodes in the set will contact us. _startHeartbeats_inlock(); } - _updateLastCommittedOpTime(lk); + _updateLastCommittedOpTimeAndWallTime(lk); return action; } @@ -3320,8 +3330,8 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource( currentSource, replMetadata, oqMetadata, _replExecutor->now()); } -void ReplicationCoordinatorImpl::_updateLastCommittedOpTime(WithLock lk) { - if (_topCoord->updateLastCommittedOpTime()) { +void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock lk) { + if (_topCoord->updateLastCommittedOpTimeAndWallTime()) { _setStableTimestampForStorage(lk); } // Wake up any threads waiting for replication that now have their replication @@ -3490,26 +3500,26 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) { } } -void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTime, - bool fromSyncSource) { +void ReplicationCoordinatorImpl::advanceCommitPoint( + const OpTimeAndWallTime& committedOpTimeAndWallTime, bool fromSyncSource) { stdx::unique_lock<stdx::mutex> lk(_mutex); - _advanceCommitPoint(lk, committedOpTime, fromSyncSource); + _advanceCommitPoint(lk, committedOpTimeAndWallTime, fromSyncSource); } -void ReplicationCoordinatorImpl::_advanceCommitPoint(WithLock lk, - const OpTime& committedOpTime, - bool fromSyncSource) { - if (_topCoord->advanceLastCommittedOpTime(committedOpTime, fromSyncSource)) { +void ReplicationCoordinatorImpl::_advanceCommitPoint( + WithLock lk, const OpTimeAndWallTime& committedOpTimeAndWallTime, bool fromSyncSource) { + if (_topCoord->advanceLastCommittedOpTimeAndWallTime(committedOpTimeAndWallTime, + fromSyncSource)) { if (_getMemberState_inlock().arbiter()) { // Arbiters do not store replicated data, so we consider their data trivially // consistent. _setMyLastAppliedOpTimeAndWallTime( - lk, {committedOpTime, Date_t::min()}, false, DataConsistency::Consistent); + lk, committedOpTimeAndWallTime, false, DataConsistency::Consistent); } _setStableTimestampForStorage(lk); // Even if we have no new snapshot, we need to notify waiters that the commit point moved. - _externalState->notifyOplogMetadataWaiters(committedOpTime); + _externalState->notifyOplogMetadataWaiters(committedOpTimeAndWallTime.opTime); } } @@ -3518,6 +3528,11 @@ OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const { return _topCoord->getLastCommittedOpTime(); } +OpTimeAndWallTime ReplicationCoordinatorImpl::getLastCommittedOpTimeAndWallTime() const { + stdx::unique_lock<stdx::mutex> lk(_mutex); + return _topCoord->getLastCommittedOpTimeAndWallTime(); +} + Status ReplicationCoordinatorImpl::processReplSetRequestVotes( OperationContext* opCtx, const ReplSetRequestVotesArgs& args, diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index e5641c38be9..76a6bf693be 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -219,7 +219,8 @@ public: virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override; - virtual void advanceCommitPoint(const OpTime& committedOpTime, bool fromSyncSource) override; + virtual void advanceCommitPoint(const OpTimeAndWallTime& committedOpTimeAndWallTime, + bool fromSyncSource) override; virtual void cancelAndRescheduleElectionTimeout() override; @@ -270,6 +271,7 @@ public: boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; virtual OpTime getLastCommittedOpTime() const override; + virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override; virtual Status processReplSetRequestVotes(OperationContext* opCtx, const ReplSetRequestVotesArgs& args, @@ -1068,8 +1070,11 @@ private: * our lastApplied, unless 'fromSyncSource'=true, which guarantees we are on the same branch of * history as 'committedOpTime', so we update our commit point to min(committedOpTime, * lastApplied). + * Also updates corresponding wall clock time. */ - void _advanceCommitPoint(WithLock lk, const OpTime& committedOpTime, bool fromSyncSource); + void _advanceCommitPoint(WithLock lk, + const OpTimeAndWallTime& committedOpTimeAndWallTime, + bool fromSyncSource); /** * Scan the memberData and determine the highest last applied or last @@ -1080,7 +1085,7 @@ private: * Whether the last applied or last durable op time is used depends on whether * the config getWriteConcernMajorityShouldJournal is set. */ - void _updateLastCommittedOpTime(WithLock lk); + void _updateLastCommittedOpTimeAndWallTime(WithLock lk); /** * Callback that attempts to set the current term in topology coordinator and diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index c98888fd2f4..56de352ac68 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -136,8 +136,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT(getReplCoord()->getMemberState().secondary()) << getReplCoord()->getMemberState().toString(); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10)); auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest(); ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen); @@ -200,8 +200,8 @@ TEST_F(ReplCoordTest, StartElectionDoesNotStartAnElectionWhenNodeIsRecovering) { ASSERT(getReplCoord()->getMemberState().recovering()) << getReplCoord()->getMemberState().toString(); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10)); simulateEnoughHeartbeatsForAllNodesUp(); auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest(); @@ -221,8 +221,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { << 1), HostAndPort("node1", 12345)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); getReplCoord()->waitForElectionFinish_forTest(); ASSERT(getReplCoord()->getMemberState().primary()) @@ -259,8 +259,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenAllNodesVoteYea) { << 1); assertStartSuccess(configObj, HostAndPort("node1", 12345)); OperationContextNoop opCtx; - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); startCapturingLogMessages(); simulateSuccessfulV1Election(); @@ -300,8 +300,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenMaxSevenNodesVoteYea) { << 1); assertStartSuccess(configObj, HostAndPort("node1", 12345)); OperationContextNoop opCtx; - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); startCapturingLogMessages(); simulateSuccessfulV1Election(); @@ -337,8 +337,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenInsufficientVotesAreReceivedDuringDryRun) OperationContextNoop opCtx; OpTime time1(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); simulateEnoughHeartbeatsForAllNodesUp(); @@ -398,8 +398,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenDryRunResponseContainsANewerTerm) { OperationContextNoop opCtx; OpTime time1(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); simulateEnoughHeartbeatsForAllNodesUp(); @@ -466,8 +466,8 @@ TEST_F(ReplCoordTest, NodeWillNotStandForElectionDuringHeartbeatReconfig) { << 1), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); getGlobalFailPointRegistry() ->getFailPoint("blockHeartbeatReconfigFinish") @@ -495,8 +495,10 @@ TEST_F(ReplCoordTest, NodeWillNotStandForElectionDuringHeartbeatReconfig) { hbResp2.setConfigVersion(3); hbResp2.setSetName("mySet"); hbResp2.setState(MemberState::RS_SECONDARY); - hbResp2.setAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - hbResp2.setDurableOpTime(OpTime(Timestamp(100, 1), 0)); + hbResp2.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); + hbResp2.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); net->runUntil(net->now() + Seconds(10)); // run until we've sent a heartbeat request const NetworkInterfaceMock::NetworkOperationIterator noi2 = net->getNextReadyRequest(); net->scheduleResponse(noi2, net->now(), makeResponseStatus(hbResp2.toBSON())); @@ -528,8 +530,10 @@ TEST_F(ReplCoordTest, NodeWillNotStandForElectionDuringHeartbeatReconfig) { hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); - hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0)); + hbResp.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); + hbResp.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); BSONObjBuilder respObj; net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON())); } else { @@ -590,8 +594,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenInsufficientVotesAreReceivedDuringRequest OperationContextNoop opCtx; OpTime time1(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); simulateEnoughHeartbeatsForAllNodesUp(); @@ -640,8 +644,8 @@ TEST_F(ReplCoordTest, TransitionToRollbackFailsWhenElectionInProgress) { ReplSetConfig config = assertMakeRSConfig(configObj); OpTime time1(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); simulateEnoughHeartbeatsForAllNodesUp(); @@ -680,8 +684,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenVoteRequestResponseContainsANewerTerm) { OperationContextNoop opCtx; OpTime time1(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); simulateEnoughHeartbeatsForAllNodesUp(); @@ -736,8 +740,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenTermChangesDuringDryRun) { OperationContextNoop opCtx; OpTime time1(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); simulateEnoughHeartbeatsForAllNodesUp(); @@ -776,8 +780,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenTermChangesDuringActualElection) { OperationContextNoop opCtx; OpTime time1(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs())); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); simulateEnoughHeartbeatsForAllNodesUp(); @@ -950,8 +954,10 @@ private: } hbResp.setConfigVersion(config.getConfigVersion()); hbResp.setTerm(replCoord->getTerm()); - hbResp.setAppliedOpTime(otherNodesOpTime); - hbResp.setDurableOpTime(otherNodesOpTime); + hbResp.setAppliedOpTimeAndWallTime( + {otherNodesOpTime, Date_t::min() + Seconds(otherNodesOpTime.getSecs())}); + hbResp.setDurableOpTimeAndWallTime( + {otherNodesOpTime, Date_t::min() + Seconds(otherNodesOpTime.getSecs())}); auto response = makeResponseStatus(hbResp.toBSON()); net->scheduleResponse(noi, net->now(), response); } @@ -983,8 +989,10 @@ TEST_F(TakeoverTest, DoesntScheduleCatchupTakeoverIfCatchupDisabledButTakeoverDe OperationContextNoop opCtx; OpTime currentOptime(Timestamp(200, 1), 0); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); ASSERT_EQUALS(ErrorCodes::StaleTerm, replCoord->updateTerm(&opCtx, 1)); @@ -1026,8 +1034,10 @@ TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfNodeIsFresherThanCurrentPrimary) // and some other node became the new primary. Once you hear about a primary election // in term 1, your term will be increased. replCoord->updateTerm_forTest(1, nullptr); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); // Make sure we're secondary and that no catchup takeover has been scheduled yet. @@ -1077,8 +1087,10 @@ TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfBothTakeoversAnOption) { // and some other node became the new primary. Once you hear about a primary election // in term 1, your term will be increased. replCoord->updateTerm_forTest(1, nullptr); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); // Make sure we're secondary and that no catchup takeover has been scheduled. @@ -1131,8 +1143,10 @@ TEST_F(TakeoverTest, PrefersPriorityToCatchupTakeoverIfNodeHasHighestPriority) { // and some other node became the new primary. Once you hear about a primary election // in term 1, your term will be increased. replCoord->updateTerm_forTest(1, nullptr); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); // Make sure we're secondary and that no catchup takeover has been scheduled. @@ -1181,8 +1195,10 @@ TEST_F(TakeoverTest, CatchupTakeoverNotScheduledTwice) { // and some other node became the new primary. Once you hear about a primary election // in term 1, your term will be increased. replCoord->updateTerm_forTest(1, nullptr); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); // Make sure we're secondary and that no catchup takeover has been scheduled. @@ -1246,8 +1262,10 @@ TEST_F(TakeoverTest, CatchupAndPriorityTakeoverNotScheduledAtSameTime) { // and some other node became the new primary. Once you hear about a primary election // in term 1, your term will be increased. replCoord->updateTerm_forTest(1, nullptr); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); // Make sure we're secondary and that no catchup takeover has been scheduled. @@ -1304,8 +1322,10 @@ TEST_F(TakeoverTest, CatchupTakeoverCallbackCanceledIfElectionTimeoutRuns) { // and some other node became the new primary. Once you hear about a primary election // in term 1, your term will be increased. replCoord->updateTerm_forTest(1, nullptr); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); // Make sure we're secondary and that no catchup takeover has been scheduled. @@ -1377,8 +1397,10 @@ TEST_F(TakeoverTest, CatchupTakeoverCanceledIfTransitionToRollback) { // and some other node became the new primary. Once you hear about a primary election // in term 1, your term will be increased. replCoord->updateTerm_forTest(1, nullptr); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); // Make sure we're secondary and that no catchup takeover has been scheduled. @@ -1440,8 +1462,10 @@ TEST_F(TakeoverTest, SuccessfulCatchupTakeover) { OpTime currentOptime(Timestamp(100, 5000), 0); OpTime behindOptime(Timestamp(100, 4000), 0); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); // Update the term so that the current term is ahead of the term of // the last applied op time. This means that the primary is still in @@ -1511,8 +1535,10 @@ TEST_F(TakeoverTest, CatchupTakeoverDryRunFailsPrimarySaysNo) { OpTime currentOptime(Timestamp(100, 5000), 0); OpTime behindOptime(Timestamp(100, 4000), 0); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); // Update the term so that the current term is ahead of the term of // the last applied op time. This means that the primary is still in @@ -1613,8 +1639,10 @@ TEST_F(TakeoverTest, PrimaryCatchesUpBeforeCatchupTakeover) { OperationContextNoop opCtx; OpTime currentOptime(Timestamp(200, 1), 0); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); // Update the term so that the current term is ahead of the term of @@ -1678,8 +1706,10 @@ TEST_F(TakeoverTest, PrimaryCatchesUpBeforeHighPriorityNodeCatchupTakeover) { OperationContextNoop opCtx; OpTime currentOptime(Timestamp(200, 1), 0); - replCoordSetMyLastAppliedOpTime(currentOptime); - replCoordSetMyLastDurableOpTime(currentOptime); + replCoordSetMyLastAppliedOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); + replCoordSetMyLastDurableOpTime(currentOptime, + Date_t::min() + Seconds(currentOptime.getSecs())); OpTime behindOptime(Timestamp(100, 1), 0); // Update the term so that the current term is ahead of the term of @@ -1760,8 +1790,8 @@ TEST_F(TakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityThanCurrent OperationContextNoop opCtx; OpTime myOptime(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(myOptime); - replCoordSetMyLastDurableOpTime(myOptime); + replCoordSetMyLastAppliedOpTime(myOptime, Date_t::min() + Seconds(myOptime.getSecs())); + replCoordSetMyLastDurableOpTime(myOptime, Date_t::min() + Seconds(myOptime.getSecs())); // Make sure we're secondary and that no priority takeover has been scheduled. ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); @@ -1807,8 +1837,8 @@ TEST_F(TakeoverTest, SuccessfulPriorityTakeover) { OperationContextNoop opCtx; OpTime myOptime(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(myOptime); - replCoordSetMyLastDurableOpTime(myOptime); + replCoordSetMyLastAppliedOpTime(myOptime, Date_t::min() + Seconds(myOptime.getSecs())); + replCoordSetMyLastDurableOpTime(myOptime, Date_t::min() + Seconds(myOptime.getSecs())); // Make sure we're secondary and that no priority takeover has been scheduled. ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); @@ -1866,8 +1896,8 @@ TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) { OpTime behindOpTime(Timestamp(100, 3999), 0); OpTime closeEnoughOpTime(Timestamp(100, 4000), 0); - replCoordSetMyLastAppliedOpTime(behindOpTime); - replCoordSetMyLastDurableOpTime(behindOpTime); + replCoordSetMyLastAppliedOpTime(behindOpTime, Date_t::min() + Seconds(behindOpTime.getSecs())); + replCoordSetMyLastDurableOpTime(behindOpTime, Date_t::min() + Seconds(behindOpTime.getSecs())); // Make sure we're secondary and that no priority takeover has been scheduled. ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); @@ -1906,8 +1936,10 @@ TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) { assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0); // Now make us caught up enough to call for priority takeover to succeed. - replCoordSetMyLastAppliedOpTime(closeEnoughOpTime); - replCoordSetMyLastDurableOpTime(closeEnoughOpTime); + replCoordSetMyLastAppliedOpTime(closeEnoughOpTime, + Date_t::min() + Seconds(closeEnoughOpTime.getSecs())); + replCoordSetMyLastDurableOpTime(closeEnoughOpTime, + Date_t::min() + Seconds(closeEnoughOpTime.getSecs())); LastVote lastVoteExpected = LastVote(replCoord->getTerm() + 1, 0); performSuccessfulTakeover(priorityTakeoverTime, @@ -1942,8 +1974,8 @@ TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecond) { OpTime currentOpTime(Timestamp(100, 1), 0); OpTime behindOpTime(Timestamp(97, 1), 0); OpTime closeEnoughOpTime(Timestamp(98, 1), 0); - replCoordSetMyLastAppliedOpTime(behindOpTime); - replCoordSetMyLastDurableOpTime(behindOpTime); + replCoordSetMyLastAppliedOpTime(behindOpTime, Date_t::min() + Seconds(behindOpTime.getSecs())); + replCoordSetMyLastDurableOpTime(behindOpTime, Date_t::min() + Seconds(behindOpTime.getSecs())); // Make sure we're secondary and that no priority takeover has been scheduled. ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); @@ -1983,8 +2015,10 @@ TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecond) { assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0); // Now make us caught up enough to call for priority takeover to succeed. - replCoordSetMyLastAppliedOpTime(closeEnoughOpTime); - replCoordSetMyLastDurableOpTime(closeEnoughOpTime); + replCoordSetMyLastAppliedOpTime(closeEnoughOpTime, + Date_t::min() + Seconds(closeEnoughOpTime.getSecs())); + replCoordSetMyLastDurableOpTime(closeEnoughOpTime, + Date_t::min() + Seconds(closeEnoughOpTime.getSecs())); LastVote lastVoteExpected = LastVote(replCoord->getTerm() + 1, 0); performSuccessfulTakeover(priorityTakeoverTime, @@ -2011,8 +2045,8 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringDryRun) { << BSON("heartbeatIntervalMillis" << 100)), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateEnoughHeartbeatsForAllNodesUp(); // Advance to dry run vote request phase. @@ -2076,8 +2110,8 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase) << BSON("heartbeatIntervalMillis" << 100)), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateEnoughHeartbeatsForAllNodesUp(); simulateSuccessfulDryRun(); ASSERT(TopologyCoordinator::Role::kCandidate == getTopoCoord().getRole()); @@ -2120,8 +2154,8 @@ protected: hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); - hbResp.setAppliedOpTime(opTime); - hbResp.setDurableOpTime(opTime); + hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min() + Seconds(opTime.getSecs())}); + hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min() + Seconds(opTime.getSecs())}); return makeResponseStatus(hbResp.toBSON()); } @@ -2190,8 +2224,8 @@ protected: assertStartSuccess(configObj, HostAndPort("node1", 12345)); ReplSetConfig config = assertMakeRSConfig(configObj); - replCoordSetMyLastAppliedOpTime(opTime); - replCoordSetMyLastDurableOpTime(opTime); + replCoordSetMyLastAppliedOpTime(opTime, Date_t::min() + Seconds(opTime.getSecs())); + replCoordSetMyLastDurableOpTime(opTime, Date_t::min() + Seconds(opTime.getSecs())); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); simulateSuccessfulV1Voting(); @@ -2254,8 +2288,8 @@ protected: // Simulate the work done by bgsync and applier threads. setMyLastAppliedOpTime() will signal // the optime waiter. - void advanceMyLastAppliedOpTime(OpTime opTime) { - replCoordSetMyLastAppliedOpTime(opTime); + void advanceMyLastAppliedOpTime(OpTime opTime, Date_t wallTime = Date_t::min()) { + replCoordSetMyLastAppliedOpTime(opTime, wallTime); getNet()->enterNetwork(); getNet()->runReadyNetworkOperations(); getNet()->exitNetwork(); @@ -2300,7 +2334,7 @@ TEST_F(PrimaryCatchUpTest, CatchupSucceeds) { net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - advanceMyLastAppliedOpTime(time2); + advanceMyLastAppliedOpTime(time2, Date_t::min() + Seconds(time2.getSecs())); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully")); @@ -2446,7 +2480,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { }); ReplicationCoordinatorImpl* replCoord = getReplCoord(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - advanceMyLastAppliedOpTime(time2); + advanceMyLastAppliedOpTime(time2, Date_t::min() + Seconds(time2.getSecs())); ASSERT(replCoord->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest")); @@ -2508,7 +2542,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); // 3) Advancing its applied optime to time 2 isn't enough. - advanceMyLastAppliedOpTime(time2); + advanceMyLastAppliedOpTime(time2, Date_t::min() + Seconds(time2.getSecs())); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); // 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target. @@ -2528,12 +2562,12 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); // 5) Advancing to time 3 isn't enough now. - advanceMyLastAppliedOpTime(time3); + advanceMyLastAppliedOpTime(time3, Date_t::min() + Seconds(time3.getSecs())); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); // 6) The node catches up time 4 eventually. startCapturingLogMessages(); - advanceMyLastAppliedOpTime(time4); + advanceMyLastAppliedOpTime(time4, Date_t::min() + Seconds(time4.getSecs())); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest")); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index bb330cdb9a7..f5f7650eb71 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -151,9 +151,17 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( BSONObj resp; if (responseStatus.isOK()) { resp = cbData.response.data; - responseStatus = hbResponse.initialize(resp, _topCoord->getTerm()); + // Wall clock times are required in ReplSetHeartbeatResponse when FCV is 4.2. Arbiters + // trivially have FCV equal to 4.2, so they are excluded from this check. + bool isArbiter = _topCoord->getMemberState() == MemberState::RS_ARBITER; + bool requireWallTime = + (serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 && + !isArbiter); + responseStatus = hbResponse.initialize(resp, _topCoord->getTerm(), requireWallTime); StatusWith<rpc::ReplSetMetadata> replMetadata = - rpc::ReplSetMetadata::readFromMetadata(cbData.response.data); + rpc::ReplSetMetadata::readFromMetadata(cbData.response.data, requireWallTime); LOG_FOR_HEARTBEATS(2) << "Received response to heartbeat (requestId: " << cbData.request.id << ") from " << target << ", " << resp; @@ -229,7 +237,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( hbStatusResponse.getValue().hasState() && hbStatusResponse.getValue().getState() != MemberState::RS_PRIMARY && action.getAdvancedOpTime()) { - _updateLastCommittedOpTime(lk); + _updateLastCommittedOpTimeAndWallTime(lk); } // Abort catchup if we have caught up to the latest known optime after heartbeat refreshing. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 8612d9be65c..361cb5f4883 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -126,8 +126,8 @@ TEST_F(ReplCoordHBV1Test, hbResp.setConfig(rsConfig); // The smallest valid optime in PV1. OpTime opTime(Timestamp(), 0); - hbResp.setAppliedOpTime(opTime); - hbResp.setDurableOpTime(opTime); + hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min()}); + hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min()}); BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1; hbResp.addToBSON(&responseBuilder); @@ -200,8 +200,8 @@ TEST_F(ReplCoordHBV1Test, hbResp.setConfig(rsConfig); // The smallest valid optime in PV1. OpTime opTime(Timestamp(), 0); - hbResp.setAppliedOpTime(opTime); - hbResp.setDurableOpTime(opTime); + hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min()}); + hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min()}); BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1; hbResp.addToBSON(&responseBuilder); @@ -274,8 +274,8 @@ TEST_F(ReplCoordHBV1Test, hbResp.setConfig(rsConfig); // The smallest valid optime in PV1. OpTime opTime(Timestamp(), 0); - hbResp.setAppliedOpTime(opTime); - hbResp.setDurableOpTime(opTime); + hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min()}); + hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min()}); BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1; hbResp.addToBSON(&responseBuilder); @@ -386,15 +386,20 @@ TEST_F(ReplCoordHBV1Test, IgnoreTheContentsOfMetadataWhenItsReplicaSetIdDoesNotM hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_PRIMARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); - hbResp.setAppliedOpTime(opTime); - hbResp.setDurableOpTime(opTime); + hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min()}); + hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min()}); BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1; hbResp.addToBSON(&responseBuilder); - rpc::ReplSetMetadata metadata( - opTime.getTerm(), opTime, opTime, rsConfig.getConfigVersion(), unexpectedId, 1, -1); + rpc::ReplSetMetadata metadata(opTime.getTerm(), + {opTime, Date_t::min()}, + opTime, + rsConfig.getConfigVersion(), + unexpectedId, + 1, + -1); uassertStatusOK(metadata.writeToMetadata(&responseBuilder)); heartbeatResponse = makeResponseStatus(responseBuilder.obj()); diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp index f7ceae5194d..e1490b238d2 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp @@ -83,8 +83,8 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenReconfigReceivedWhileSecondary) { HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); BSONObjBuilder result; ReplSetReconfigArgs args; @@ -108,8 +108,8 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenReconfigReceivedWith << "node2:12345"))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); BSONObjBuilder result; @@ -152,8 +152,8 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenReconfigReceivedWith << "node2:12345"))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); BSONObjBuilder result; @@ -192,8 +192,8 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenReconfigReceivedWith << BSON("replicaSetId" << OID::gen())), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); BSONObjBuilder result; @@ -233,8 +233,8 @@ TEST_F(ReplCoordTest, << "node2:12345"))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); BSONObjBuilder result; @@ -314,8 +314,8 @@ TEST_F(ReplCoordTest, << "node2:12345"))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); Status status(ErrorCodes::InternalError, "Not Set"); @@ -333,8 +333,10 @@ TEST_F(ReplCoordTest, hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(5); BSONObjBuilder respObj; - hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0)); + hbResp.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); + hbResp.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); respObj << "ok" << 1; hbResp.addToBSON(&respObj); net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj())); @@ -357,8 +359,8 @@ TEST_F(ReplCoordTest, NodeReturnsOutOfDiskSpaceWhenSavingANewConfigFailsDuringRe << "node2:12345"))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); Status status(ErrorCodes::InternalError, "Not Set"); @@ -386,8 +388,8 @@ TEST_F(ReplCoordTest, << "node2:12345"))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); Status status(ErrorCodes::InternalError, "Not Set"); @@ -426,8 +428,8 @@ TEST_F(ReplCoordTest, NodeReturnsConfigurationInProgressWhenReceivingAReconfigWh init(); start(HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); // initiate Status status(ErrorCodes::InternalError, "Not Set"); @@ -475,8 +477,8 @@ TEST_F(ReplCoordTest, PrimaryNodeAcceptsNewConfigWhenReceivingAReconfigWithAComp << BSON("replicaSetId" << OID::gen())), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); Status status(ErrorCodes::InternalError, "Not Set"); @@ -493,8 +495,10 @@ TEST_F(ReplCoordTest, PrimaryNodeAcceptsNewConfigWhenReceivingAReconfigWithAComp hbResp.setSetName("mySet"); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(2); - hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0)); + hbResp.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); + hbResp.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); BSONObjBuilder respObj; respObj << "ok" << 1; hbResp.addToBSON(&respObj); @@ -521,8 +525,8 @@ TEST_F( << "node2:12345"))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); @@ -552,8 +556,10 @@ TEST_F( hbResp2.setConfigVersion(3); hbResp2.setSetName("mySet"); hbResp2.setState(MemberState::RS_SECONDARY); - hbResp2.setAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - hbResp2.setDurableOpTime(OpTime(Timestamp(100, 1), 0)); + hbResp2.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); + hbResp2.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); BSONObjBuilder respObj2; respObj2 << "ok" << 1; hbResp2.addToBSON(&respObj2); @@ -590,8 +596,8 @@ TEST_F(ReplCoordTest, NodeDoesNotAcceptHeartbeatReconfigWhileInTheMidstOfReconfi << "node2:12345"))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); @@ -626,8 +632,10 @@ TEST_F(ReplCoordTest, NodeDoesNotAcceptHeartbeatReconfigWhileInTheMidstOfReconfi hbResp.setConfigVersion(4); hbResp.setSetName("mySet"); hbResp.setState(MemberState::RS_SECONDARY); - hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0)); + hbResp.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); + hbResp.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); BSONObjBuilder respObj2; respObj2 << "ok" << 1; hbResp.addToBSON(&respObj2); @@ -661,8 +669,8 @@ TEST_F(ReplCoordTest, NodeAcceptsConfigFromAReconfigWithForceTrueWhileNotPrimary << "node2:12345"))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); // fail before forced BSONObjBuilder result; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 84372bd0a9d..731e39e6edb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -129,8 +129,8 @@ TEST_F(ReplCoordTest, IsMasterIsFalseDuringStepdown) { ReplSetConfig config = assertMakeRSConfig(configObj); auto replCoord = getReplCoord(); ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); ASSERT(replCoord->getMemberState().primary()); @@ -417,7 +417,7 @@ TEST_F(ReplCoordTest, InitiateSucceedsWhenQuorumCheckPasses) { hbArgs.setHeartbeatVersion(1); auto appliedTS = Timestamp(3, 3); - replCoordSetMyLastAppliedOpTime(OpTime(appliedTS, 1)); + replCoordSetMyLastAppliedOpTime(OpTime(appliedTS, 1), Date_t::min() + Seconds(100)); Status status(ErrorCodes::InternalError, "Not set"); stdx::thread prsiThread([&] { doReplSetInitiate(getReplCoord(), &status); }); @@ -429,8 +429,10 @@ TEST_F(ReplCoordTest, InitiateSucceedsWhenQuorumCheckPasses) { ASSERT_BSONOBJ_EQ(hbArgs.toBSON(), noi->getRequest().cmdObj); ReplSetHeartbeatResponse hbResp; hbResp.setConfigVersion(0); - hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0)); + hbResp.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); + hbResp.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); getNet()->scheduleResponse( noi, startDate + Milliseconds(10), RemoteCommandResponse(hbResp.toBSON(), Milliseconds(8))); getNet()->runUntil(startDate + Milliseconds(10)); @@ -704,8 +706,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith // Become primary. ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); ASSERT(getReplCoord()->getMemberState().primary()); @@ -741,8 +743,8 @@ TEST_F(ReplCoordTest, << 3))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTimeWithTermOne time1(100, 2); @@ -758,8 +760,8 @@ TEST_F(ReplCoordTest, ReplicationCoordinator::StatusAndDuration statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time1, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100)); statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time1, writeConcern); ASSERT_OK(statusAndDur.status); @@ -778,8 +780,8 @@ TEST_F(ReplCoordTest, // 2 nodes waiting for time2 statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time2, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastDurableOpTime(time2); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100)); statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time2, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time2)); @@ -821,8 +823,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes << 3))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTimeWithTermOne time1(100, 2); @@ -839,8 +841,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes ReplicationCoordinator::StatusAndDuration statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time1, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100)); statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time1, writeConcern); ASSERT_OK(statusAndDur.status); @@ -855,8 +857,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes // 2 nodes waiting for time2 statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time2, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastDurableOpTime(time2); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100)); statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time2, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time2)); @@ -892,8 +894,8 @@ TEST_F(ReplCoordTest, << "node4"))), HostAndPort("node0")); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTime time1(Timestamp(100, 1), 1); @@ -959,8 +961,8 @@ TEST_F( << BSON("dc" << 2 << "rack" << 3)))), HostAndPort("node0")); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTime time1(Timestamp(100, 2), 1); @@ -982,8 +984,8 @@ TEST_F( auto opCtx = makeOperationContext(); // Nothing satisfied - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100)); ReplicationCoordinator::StatusAndDuration statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time1, majorityWriteConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); @@ -1017,8 +1019,8 @@ TEST_F( ASSERT_OK(statusAndDur.status); // multiDC satisfied but not majority or multiRack - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastDurableOpTime(time2); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100)); getReplCoord()->setLastAppliedOptime_forTest(2, 3, time2).transitional_ignore(); getReplCoord()->setLastDurableOptime_forTest(2, 3, time2).transitional_ignore(); @@ -1115,8 +1117,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie << 2))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), getServiceContext()); @@ -1132,8 +1134,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie awaiter.setOpTime(time1); awaiter.setWriteConcern(writeConcern); awaiter.start(); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_OK(statusAndDur.status); @@ -1142,8 +1144,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie // 2 nodes waiting for time2 awaiter.setOpTime(time2); awaiter.start(); - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastDurableOpTime(time2); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time2)); statusAndDur = awaiter.getResult(); ASSERT_OK(statusAndDur.status); @@ -1179,8 +1181,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo << 2))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), getServiceContext()); @@ -1196,8 +1198,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo awaiter.setOpTime(time2); awaiter.setWriteConcern(writeConcern); awaiter.start(); - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastDurableOpTime(time2); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); { NetworkInterfaceMock::InNetworkGuard inNet(getNet()); @@ -1230,8 +1232,8 @@ TEST_F(ReplCoordTest, << 2))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), getServiceContext()); @@ -1280,8 +1282,8 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite << 2))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); const auto opCtx = makeOperationContext(); @@ -1322,8 +1324,8 @@ TEST_F(ReplCoordTest, << "node3"))), HostAndPort("node1")); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), getServiceContext()); @@ -1387,9 +1389,12 @@ protected: } // Makes it so enough secondaries are caught up that a stepdown command can succeed. - void catchUpSecondaries(const OpTime& desiredOpTime) { + void catchUpSecondaries(const OpTime& desiredOpTime, Date_t desiredWallTime = Date_t::min()) { auto config = getReplCoord()->getConfig(); auto heartbeatInterval = config.getHeartbeatInterval(); + if (desiredWallTime == Date_t::min() && !desiredOpTime.isNull()) { + desiredWallTime = Date_t::min() + Seconds(desiredOpTime.getSecs()); + } enterNetwork(); getNet()->runUntil(getNet()->now() + heartbeatInterval); @@ -1402,8 +1407,8 @@ protected: hbResp.setSetName(hbArgs.getSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(hbArgs.getConfigVersion()); - hbResp.setAppliedOpTime(desiredOpTime); - hbResp.setDurableOpTime(desiredOpTime); + hbResp.setAppliedOpTimeAndWallTime({desiredOpTime, desiredWallTime}); + hbResp.setDurableOpTimeAndWallTime({desiredOpTime, desiredWallTime}); BSONObjBuilder respObj; respObj << "ok" << 1; hbResp.addToBSON(&respObj); @@ -1468,8 +1473,8 @@ TEST_F(ReplCoordTest, ElectionIdTracksTermInPV1) { << "protocolVersion" << 1), HostAndPort("test1", 1234)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); @@ -1532,8 +1537,8 @@ TEST_F(ReplCoordTest, NodeChangesTermAndStepsDownWhenAndOnlyWhenUpdateTermSuppli << "protocolVersion" << 1), HostAndPort("test1", 1234)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); @@ -1578,8 +1583,8 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha << "protocolVersion" << 1), HostAndPort("test1", 1234)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); @@ -1634,8 +1639,8 @@ TEST_F(ReplCoordTest, DrainCompletionMidStepDown) { << "protocolVersion" << 1), HostAndPort("test1", 1234)); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); @@ -1671,8 +1676,8 @@ TEST_F(StepDownTest, StepDownCanCompleteBasedOnReplSetUpdatePositionAlone) { OpTimeWithTermOne opTime1(100, 1); OpTimeWithTermOne opTime2(200, 1); - replCoordSetMyLastAppliedOpTime(opTime2); - replCoordSetMyLastDurableOpTime(opTime2); + replCoordSetMyLastAppliedOpTime(opTime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(opTime2, Date_t::min() + Seconds(100)); // Secondaries not caught up yet. ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1)); @@ -1691,7 +1696,8 @@ TEST_F(StepDownTest, StepDownCanCompleteBasedOnReplSetUpdatePositionAlone) { long long configVersion = repl->getConfig().getConfigVersion(); UpdatePositionArgs updatePositionArgs; - ASSERT_OK(updatePositionArgs.initialize( + ASSERT_OK(updatePositionArgsInitialize( + updatePositionArgs, BSON(UpdatePositionArgs::kCommandFieldName << 1 << UpdatePositionArgs::kUpdateArrayFieldName @@ -1701,16 +1707,24 @@ TEST_F(StepDownTest, StepDownCanCompleteBasedOnReplSetUpdatePositionAlone) { << 1 << UpdatePositionArgs::kAppliedOpTimeFieldName << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(opTime2.asOpTime().getSecs()) << UpdatePositionArgs::kDurableOpTimeFieldName - << opTime2.asOpTime().toBSON()) + << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(opTime2.asOpTime().getSecs())) << BSON(UpdatePositionArgs::kConfigVersionFieldName << configVersion << UpdatePositionArgs::kMemberIdFieldName << 2 << UpdatePositionArgs::kAppliedOpTimeFieldName << opTime1.asOpTime().toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(opTime1.asOpTime().getSecs()) << UpdatePositionArgs::kDurableOpTimeFieldName - << opTime1.asOpTime().toBSON()))))); + << opTime1.asOpTime().toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(opTime1.asOpTime().getSecs())))))); ASSERT_OK(repl->processReplSetUpdatePosition(updatePositionArgs, &configVersion)); @@ -1725,8 +1739,8 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) { OpTimeWithTermOne opTime1(100, 1); OpTimeWithTermOne opTime2(200, 1); - replCoordSetMyLastAppliedOpTime(opTime2); - replCoordSetMyLastDurableOpTime(opTime2); + replCoordSetMyLastAppliedOpTime(opTime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(opTime2, Date_t::min() + Seconds(100)); // Secondaries not caught up yet. ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1)); @@ -1812,8 +1826,8 @@ TEST_F(StepDownTestWithUnelectableNode, OpTimeWithTermOne opTime1(100, 1); OpTimeWithTermOne opTime2(200, 1); - replCoordSetMyLastAppliedOpTime(opTime2); - replCoordSetMyLastDurableOpTime(opTime2); + replCoordSetMyLastAppliedOpTime(opTime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(opTime2, Date_t::min() + Seconds(100)); // No secondaries are caught up yet. ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1)); @@ -1834,7 +1848,8 @@ TEST_F(StepDownTestWithUnelectableNode, long long configVersion = repl->getConfig().getConfigVersion(); UpdatePositionArgs catchupFirstSecondary; - ASSERT_OK(catchupFirstSecondary.initialize( + ASSERT_OK(updatePositionArgsInitialize( + catchupFirstSecondary, BSON(UpdatePositionArgs::kCommandFieldName << 1 << UpdatePositionArgs::kUpdateArrayFieldName @@ -1844,16 +1859,24 @@ TEST_F(StepDownTestWithUnelectableNode, << 1 << UpdatePositionArgs::kAppliedOpTimeFieldName << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(opTime2.asOpTime().getSecs()) << UpdatePositionArgs::kDurableOpTimeFieldName - << opTime2.asOpTime().toBSON()) + << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(opTime2.asOpTime().getSecs())) << BSON(UpdatePositionArgs::kConfigVersionFieldName << configVersion << UpdatePositionArgs::kMemberIdFieldName << 2 << UpdatePositionArgs::kAppliedOpTimeFieldName << opTime1.asOpTime().toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(opTime1.asOpTime().getSecs()) << UpdatePositionArgs::kDurableOpTimeFieldName - << opTime1.asOpTime().toBSON()))))); + << opTime1.asOpTime().toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(opTime1.asOpTime().getSecs())))))); ASSERT_OK(repl->processReplSetUpdatePosition(catchupFirstSecondary, &configVersion)); @@ -1864,7 +1887,8 @@ TEST_F(StepDownTestWithUnelectableNode, // there is an electable node, so stepDown will complete. UpdatePositionArgs catchupOtherSecondary; - ASSERT_OK(catchupOtherSecondary.initialize( + ASSERT_OK(updatePositionArgsInitialize( + catchupOtherSecondary, BSON(UpdatePositionArgs::kCommandFieldName << 1 << UpdatePositionArgs::kUpdateArrayFieldName @@ -1874,16 +1898,24 @@ TEST_F(StepDownTestWithUnelectableNode, << 1 << UpdatePositionArgs::kAppliedOpTimeFieldName << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(opTime2.asOpTime().getSecs()) << UpdatePositionArgs::kDurableOpTimeFieldName - << opTime2.asOpTime().toBSON()) + << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(opTime2.asOpTime().getSecs())) << BSON(UpdatePositionArgs::kConfigVersionFieldName << configVersion << UpdatePositionArgs::kMemberIdFieldName << 2 << UpdatePositionArgs::kAppliedOpTimeFieldName << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(opTime2.asOpTime().getSecs()) << UpdatePositionArgs::kDurableOpTimeFieldName - << opTime2.asOpTime().toBSON()))))); + << opTime2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(opTime2.asOpTime().getSecs())))))); ASSERT_OK(repl->processReplSetUpdatePosition(catchupOtherSecondary, &configVersion)); @@ -1897,8 +1929,8 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { OpTimeWithTermOne optime1(100, 1); // All nodes are caught up - replCoordSetMyLastAppliedOpTime(optime1); - replCoordSetMyLastDurableOpTime(optime1); + replCoordSetMyLastAppliedOpTime(optime1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime1, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -1916,8 +1948,8 @@ TEST_F(StepDownTest, // Set up this test so that all nodes are caught up. This is necessary to exclude the false // positive case where stepDown returns "ExceededTimeLimit", but not because it could not // acquire the lock, but because it could not satisfy all stepdown conditions on time. - replCoordSetMyLastAppliedOpTime(optime1); - replCoordSetMyLastDurableOpTime(optime1); + replCoordSetMyLastAppliedOpTime(optime1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime1, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -1955,8 +1987,16 @@ protected: */ void simulateHeartbeatResponses(OpTime optimePrimary, OpTime optimeLagged, - int numNodesCaughtUp) { + int numNodesCaughtUp, + Date_t wallTimePrimary = Date_t::min(), + Date_t wallTimeLagged = Date_t::min()) { int hbNum = 1; + if (wallTimePrimary == Date_t::min()) { + wallTimePrimary = wallTimePrimary + Seconds(optimePrimary.getSecs()); + } + if (wallTimeLagged == Date_t::min()) { + wallTimeLagged = wallTimeLagged + Seconds(optimeLagged.getSecs()); + } while (getNet()->hasReadyRequests()) { NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); RemoteCommandRequest request = noi->getRequest(); @@ -1972,13 +2012,15 @@ protected: // Catch up 'numNodesCaughtUp' nodes out of 5. OpTime optimeResponse = (hbNum <= numNodesCaughtUp) ? optimePrimary : optimeLagged; + Date_t wallTimeResponse = + (hbNum <= numNodesCaughtUp) ? wallTimePrimary : wallTimeLagged; ReplSetHeartbeatResponse hbResp; hbResp.setSetName(hbArgs.getSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(hbArgs.getConfigVersion()); - hbResp.setDurableOpTime(optimeResponse); - hbResp.setAppliedOpTime(optimeResponse); + hbResp.setDurableOpTimeAndWallTime({optimeResponse, wallTimeResponse}); + hbResp.setAppliedOpTimeAndWallTime({optimeResponse, wallTimeResponse}); BSONObjBuilder respObj; respObj << "ok" << 1; hbResp.addToBSON(&respObj); @@ -2018,8 +2060,8 @@ TEST_F(StepDownTestFiveNode, OpTime optimePrimary(Timestamp(100, 2), 1); // All nodes are caught up - replCoordSetMyLastAppliedOpTime(optimePrimary); - replCoordSetMyLastDurableOpTime(optimePrimary); + replCoordSetMyLastAppliedOpTime(optimePrimary, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optimePrimary, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optimeLagged)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optimeLagged)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 3, optimeLagged)); @@ -2055,8 +2097,8 @@ TEST_F( OpTime optimePrimary(Timestamp(100, 2), 1); // All nodes are caught up - replCoordSetMyLastAppliedOpTime(optimePrimary); - replCoordSetMyLastDurableOpTime(optimePrimary); + replCoordSetMyLastAppliedOpTime(optimePrimary, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optimePrimary, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optimeLagged)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optimeLagged)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 3, optimeLagged)); @@ -2144,8 +2186,8 @@ TEST_F(ReplCoordTest, SingleNodeReplSetUnfreeze) { // Become Secondary. ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); ASSERT_TRUE(getTopoCoord().getMemberState().secondary()); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); @@ -2248,8 +2290,8 @@ TEST_F(StepDownTest, OpTimeWithTermOne optime2(100, 2); // No secondary is caught up auto repl = getReplCoord(); - replCoordSetMyLastAppliedOpTime(optime2); - replCoordSetMyLastDurableOpTime(optime2); + replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -2286,8 +2328,8 @@ TEST_F(StepDownTest, // No secondary is caught up auto repl = getReplCoord(); - replCoordSetMyLastAppliedOpTime(optime2); - replCoordSetMyLastDurableOpTime(optime2); + replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -2298,7 +2340,7 @@ TEST_F(StepDownTest, // Step down where the secondary actually has to catch up before the stepDown can succeed. auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); - catchUpSecondaries(optime2); + catchUpSecondaries(optime2, Date_t::min() + Seconds(optime2.getSecs())); ASSERT_OK(*result.second.get()); ASSERT_TRUE(repl->getMemberState().secondary()); @@ -2311,8 +2353,8 @@ TEST_F(StepDownTest, // No secondary is caught up auto repl = getReplCoord(); - replCoordSetMyLastAppliedOpTime(optime2); - replCoordSetMyLastDurableOpTime(optime2); + replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -2356,8 +2398,8 @@ TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) { OpTimeWithTermOne optime2(100, 2); // No secondary is caught up auto repl = getReplCoord(); - replCoordSetMyLastAppliedOpTime(optime2); - replCoordSetMyLastDurableOpTime(optime2); + replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -2378,8 +2420,8 @@ TEST_F(StepDownTest, OnlyOneStepDownCmdIsAllowedAtATime) { // No secondary is caught up auto repl = getReplCoord(); - replCoordSetMyLastAppliedOpTime(optime2); - replCoordSetMyLastDurableOpTime(optime2); + replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -2415,8 +2457,8 @@ TEST_F(StepDownTest, UnconditionalStepDownFailsStepDownCommand) { // No secondary is caught up auto repl = getReplCoord(); - replCoordSetMyLastAppliedOpTime(optime2); - replCoordSetMyLastDurableOpTime(optime2); + replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -2448,8 +2490,8 @@ TEST_F(StepDownTest, InterruptingStepDownCommandRestoresWriteAvailability) { // No secondary is caught up auto repl = getReplCoord(); - replCoordSetMyLastAppliedOpTime(optime2); - replCoordSetMyLastDurableOpTime(optime2); + replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -2500,8 +2542,8 @@ TEST_F(StepDownTest, InterruptingAfterUnconditionalStepdownDoesNotRestoreWriteAv // No secondary is caught up auto repl = getReplCoord(); - replCoordSetMyLastAppliedOpTime(optime2); - replCoordSetMyLastDurableOpTime(optime2); + replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1)); @@ -2595,8 +2637,8 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInUpdatePositionCommand) { OpTime optime1({2, 1}, 1); OpTime optime2({100, 1}, 1); OpTime optime3({100, 2}, 1); - replCoordSetMyLastAppliedOpTime(optime1); - replCoordSetMyLastDurableOpTime(optime1); + replCoordSetMyLastAppliedOpTime(optime1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime1, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime2)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime3)); ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(1, 2, optime3)); @@ -2672,8 +2714,8 @@ TEST_F(ReplCoordTest, << "test3:1234"))), HostAndPort("test2", 1234)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); // Can't unset maintenance mode if it was never set to begin with. Status status = getReplCoord()->setMaintenanceMode(false); @@ -2699,8 +2741,8 @@ TEST_F(ReplCoordTest, << "test3:1234"))), HostAndPort("test2", 1234)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); // valid set ASSERT_OK(getReplCoord()->setMaintenanceMode(true)); ASSERT_TRUE(getReplCoord()->getMemberState().recovering()); @@ -2735,8 +2777,8 @@ TEST_F(ReplCoordTest, AllowAsManyUnsetMaintenanceModesAsThereHaveBeenSetMaintena << "test3:1234"))), HostAndPort("test2", 1234)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); // Can set multiple times ASSERT_OK(getReplCoord()->setMaintenanceMode(true)); ASSERT_OK(getReplCoord()->setMaintenanceMode(true)); @@ -2769,8 +2811,8 @@ TEST_F(ReplCoordTest, SettingAndUnsettingMaintenanceModeShouldNotAffectRollbackS << "test3:1234"))), HostAndPort("test2", 1234)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); // We must take the RSTL in mode X before transitioning to RS_ROLLBACK. const auto opCtx = makeOperationContext(); @@ -2815,8 +2857,8 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) { << "test3:1234"))), HostAndPort("test2", 1234)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); // Can't modify maintenance mode when PRIMARY simulateSuccessfulV1Election(); @@ -2854,8 +2896,8 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection) << "test3:1234"))), HostAndPort("test2", 1234)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); // TODO this election shouldn't have to happen. simulateSuccessfulV1Election(); @@ -2924,8 +2966,8 @@ TEST_F(ReplCoordTest, OpTimeWithTermOne time1(100, 1); OpTimeWithTermOne time2(100, 2); - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastDurableOpTime(time2); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 1, time1)); @@ -2969,8 +3011,8 @@ TEST_F(ReplCoordTest, OpTimeWithTermOne time1(100, 1); OpTimeWithTermOne time2(100, 2); - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastDurableOpTime(time2); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); std::vector<HostAndPort> caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2, false); @@ -3063,7 +3105,7 @@ TEST_F(ReplCoordTest, IsMaster) { time_t lastWriteDate = 100; OpTime opTime = OpTime(Timestamp(lastWriteDate, 2), 1); - replCoordSetMyLastAppliedOpTime(opTime); + replCoordSetMyLastAppliedOpTime(opTime, Date_t::min() + Seconds(100)); IsMasterResponse response; getReplCoord()->fillIsMasterForReplSet(&response); @@ -3125,8 +3167,8 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) { time_t majorityWriteDate = lastWriteDate; OpTime majorityOpTime = opTime; - replCoordSetMyLastAppliedOpTime(opTime); - replCoordSetMyLastDurableOpTime(opTime); + replCoordSetMyLastAppliedOpTime(opTime, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(opTime, Date_t::min() + Seconds(100)); ASSERT_EQUALS(majorityOpTime, getReplCoord()->getCurrentCommittedSnapshotOpTime()); IsMasterResponse response; @@ -3198,14 +3240,14 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) { << 2))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTime time1({100, 1}, 1); OpTime time2({100, 2}, 1); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100)); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -3219,17 +3261,23 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) { // receive updatePosition containing ourself, should not process the update for self UpdatePositionArgs args; - ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName - << 1 - << UpdatePositionArgs::kUpdateArrayFieldName - << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 0 - << UpdatePositionArgs::kDurableOpTimeFieldName - << time2.toBSON() - << UpdatePositionArgs::kAppliedOpTimeFieldName - << time2.toBSON()))))); + ASSERT_OK(updatePositionArgsInitialize( + args, + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 0 + << UpdatePositionArgs::kDurableOpTimeFieldName + << time2.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(time2.getSecs()) + << UpdatePositionArgs::kAppliedOpTimeFieldName + << time2.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(time2.getSecs())))))); ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, @@ -3256,14 +3304,14 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect) << 2))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTime time1({100, 1}, 1); OpTime time2({100, 2}, 1); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100)); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -3271,17 +3319,23 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect) // receive updatePosition with incorrect config version UpdatePositionArgs args; - ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName - << 1 - << UpdatePositionArgs::kUpdateArrayFieldName - << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName - << 3 - << UpdatePositionArgs::kMemberIdFieldName - << 1 - << UpdatePositionArgs::kDurableOpTimeFieldName - << time2.toBSON() - << UpdatePositionArgs::kAppliedOpTimeFieldName - << time2.toBSON()))))); + ASSERT_OK(updatePositionArgsInitialize( + args, + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << 3 + << UpdatePositionArgs::kMemberIdFieldName + << 1 + << UpdatePositionArgs::kDurableOpTimeFieldName + << time2.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(time2.getSecs()) + << UpdatePositionArgs::kAppliedOpTimeFieldName + << time2.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(time2.getSecs())))))); auto opCtx = makeOperationContext(); @@ -3313,14 +3367,14 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf << 2))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTime time1({100, 1}, 1); OpTime time2({100, 2}, 1); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100)); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -3328,17 +3382,23 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf // receive updatePosition with nonexistent member id UpdatePositionArgs args; - ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName - << 1 - << UpdatePositionArgs::kUpdateArrayFieldName - << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 9 - << UpdatePositionArgs::kDurableOpTimeFieldName - << time2.toBSON() - << UpdatePositionArgs::kAppliedOpTimeFieldName - << time2.toBSON()))))); + ASSERT_OK(updatePositionArgsInitialize( + args, + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 9 + << UpdatePositionArgs::kDurableOpTimeFieldName + << time2.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(time2.getSecs()) + << UpdatePositionArgs::kAppliedOpTimeFieldName + << time2.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(time2.getSecs())))))); auto opCtx = makeOperationContext(); @@ -3369,44 +3429,53 @@ TEST_F(ReplCoordTest, << 2))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTimeWithTermOne time1(100, 1); OpTimeWithTermOne time2(100, 2); OpTimeWithTermOne staleTime(10, 0); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastDurableOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100)); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; writeConcern.wNumNodes = 1; // receive a good update position - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastDurableOpTime(time2); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100)); UpdatePositionArgs args; - ASSERT_OK( - args.initialize(BSON(UpdatePositionArgs::kCommandFieldName - << 1 - << UpdatePositionArgs::kUpdateArrayFieldName - << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 1 - << UpdatePositionArgs::kAppliedOpTimeFieldName - << time2.asOpTime().toBSON() - << UpdatePositionArgs::kDurableOpTimeFieldName - << time2.asOpTime().toBSON()) - << BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 2 - << UpdatePositionArgs::kAppliedOpTimeFieldName - << time2.asOpTime().toBSON() - << UpdatePositionArgs::kDurableOpTimeFieldName - << time2.asOpTime().toBSON()))))); + ASSERT_OK(updatePositionArgsInitialize( + args, + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 1 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << time2.asOpTime().toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(time2.asOpTime().getSecs()) + << UpdatePositionArgs::kDurableOpTimeFieldName + << time2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(time2.asOpTime().getSecs())) + << BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 2 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << time2.asOpTime().toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(time2.asOpTime().getSecs()) + << UpdatePositionArgs::kDurableOpTimeFieldName + << time2.asOpTime().toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(time2.asOpTime().getSecs())))))); auto opCtx = makeOperationContext(); @@ -3468,8 +3537,8 @@ TEST_F(ReplCoordTest, AwaitReplicationShouldResolveAsNormalDuringAReconfig) { disableSnapshots(); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 2)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 2)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 2), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 2), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTimeWithTermOne time(100, 2); @@ -3561,8 +3630,8 @@ TEST_F( << 2))), HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 2)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 2)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 2), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 2), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTimeWithTermOne time(100, 2); @@ -3635,15 +3704,15 @@ TEST_F(ReplCoordTest, disableSnapshots(); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); OpTime time(Timestamp(100, 2), 1); auto opCtx = makeOperationContext(); - replCoordSetMyLastAppliedOpTime(time); - replCoordSetMyLastDurableOpTime(time); + replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time)); @@ -3720,8 +3789,8 @@ TEST_F(ReplCoordTest, HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); OpTime time(Timestamp(100, 1), 1); - replCoordSetMyLastAppliedOpTime(time); - replCoordSetMyLastDurableOpTime(time); + replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); WriteConcernOptions majorityWriteConcern; @@ -3789,8 +3858,8 @@ TEST_F(ReplCoordTest, ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); OpTime zero(Timestamp(0, 0), 0); OpTime time(Timestamp(100, 1), 1); - replCoordSetMyLastAppliedOpTime(time); - replCoordSetMyLastDurableOpTime(time); + replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); ASSERT_EQUALS(zero, getReplCoord()->getLastCommittedOpTime()); @@ -3809,8 +3878,8 @@ TEST_F(ReplCoordTest, // Set a new, later OpTime. OpTime newTime(Timestamp(100, 1), 1); - replCoordSetMyLastAppliedOpTime(newTime); - replCoordSetMyLastDurableOpTime(newTime); + replCoordSetMyLastAppliedOpTime(newTime, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(newTime, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime()); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 3, newTime)); ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 3, newTime)); @@ -4019,16 +4088,16 @@ TEST_F(StableOpTimeTest, SetMyLastAppliedSetsStableOpTimeForStorage) { ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); getStorageInterface()->allCommittedTimestamp = Timestamp(1, 1); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(1, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(1, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); // Advance the commit point so it's higher than all the others. - repl->advanceCommitPoint(OpTimeWithTermOne(10, 1), false); + replCoordAdvanceCommitPoint(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100), false); ASSERT_EQUALS(Timestamp(1, 1), getStorageInterface()->getStableTimestamp()); // Check that the stable timestamp is not updated if the all-committed timestamp is behind. - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 2)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 2), Date_t::min() + Seconds(100)); stableTimestamp = getStorageInterface()->getStableTimestamp(); ASSERT_EQUALS(Timestamp(1, 1), getStorageInterface()->getStableTimestamp()); @@ -4036,12 +4105,12 @@ TEST_F(StableOpTimeTest, SetMyLastAppliedSetsStableOpTimeForStorage) { // Check that the stable timestamp is updated for the storage engine when we set the applied // optime. - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 1), Date_t::min() + Seconds(100)); stableTimestamp = getStorageInterface()->getStableTimestamp(); ASSERT_EQUALS(Timestamp(2, 1), stableTimestamp); // Check that timestamp cleanup occurs. - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 2)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 2), Date_t::min() + Seconds(100)); stableTimestamp = getStorageInterface()->getStableTimestamp(); ASSERT_EQUALS(Timestamp(2, 2), stableTimestamp); @@ -4087,7 +4156,7 @@ TEST_F(StableOpTimeTest, SetMyLastAppliedSetsStableOpTimeForStorageDisableMajori // Check that the stable timestamp is updated for the storage engine when we set the applied // optime, even though the last committed optime is unset. getStorageInterface()->allCommittedTimestamp = Timestamp(1, 1); - replCoordSetMyLastAppliedOpTime(OpTime({1, 1}, 1)); + replCoordSetMyLastAppliedOpTime(OpTime({1, 1}, 1), Date_t::min() + Seconds(100)); ASSERT_EQUALS(Timestamp(1, 1), getStorageInterface()->getStableTimestamp()); } @@ -4115,11 +4184,10 @@ TEST_F(StableOpTimeTest, AdvanceCommitPointSetsStableOpTimeForStorage) { << "test3:1234"))), HostAndPort("test2", 1234)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(1, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(1, 1), Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); - auto repl = getReplCoord(); Timestamp stableTimestamp; long long term = 2; @@ -4127,24 +4195,30 @@ TEST_F(StableOpTimeTest, AdvanceCommitPointSetsStableOpTimeForStorage) { getStorageInterface()->allCommittedTimestamp = Timestamp(2, 1); // Add three stable optime candidates. - replCoordSetMyLastAppliedOpTime(OpTime({2, 1}, term)); - replCoordSetMyLastAppliedOpTime(OpTime({2, 2}, term)); - replCoordSetMyLastAppliedOpTime(OpTime({3, 2}, term)); + replCoordSetMyLastAppliedOpTime(OpTime({2, 1}, term), Date_t::min() + Seconds(1)); + replCoordSetMyLastAppliedOpTime(OpTime({2, 2}, term), Date_t::min() + Seconds(2)); + replCoordSetMyLastAppliedOpTime(OpTime({3, 2}, term), Date_t::min() + Seconds(3)); // Set a commit point and check the stable optime. - repl->advanceCommitPoint(OpTime({2, 1}, term), false); + replCoordAdvanceCommitPoint(OpTime({2, 1}, term), Date_t::min() + Seconds(1), false); + ASSERT_EQUALS(getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime, + Date_t::min() + Seconds(1)); stableTimestamp = getStorageInterface()->getStableTimestamp(); ASSERT_EQUALS(Timestamp(2, 1), stableTimestamp); // Check that the stable timestamp is not updated if the all-committed timestamp is behind. - repl->advanceCommitPoint(OpTime({2, 2}, term), false); + replCoordAdvanceCommitPoint(OpTime({2, 2}, term), Date_t::min() + Seconds(2), false); + ASSERT_EQUALS(getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime, + Date_t::min() + Seconds(2)); stableTimestamp = getStorageInterface()->getStableTimestamp(); ASSERT_EQUALS(Timestamp(2, 1), stableTimestamp); getStorageInterface()->allCommittedTimestamp = Timestamp(4, 4); // Check that the stable timestamp is updated when we advance the commit point. - repl->advanceCommitPoint(OpTime({3, 2}, term), false); + replCoordAdvanceCommitPoint(OpTime({3, 2}, term), Date_t::min() + Seconds(3), false); + ASSERT_EQUALS(getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime, + Date_t::min() + Seconds(3)); stableTimestamp = getStorageInterface()->getStableTimestamp(); ASSERT_EQUALS(Timestamp(3, 2), stableTimestamp); @@ -4177,14 +4251,14 @@ TEST_F(StableOpTimeTest, ClearOpTimeCandidatesPastCommonPointAfterRollback) { OpTime commitPoint = OpTime({1, 2}, term); ASSERT_EQUALS(Timestamp::min(), getStorageInterface()->getStableTimestamp()); - replCoordSetMyLastAppliedOpTime(OpTime({0, 1}, term)); + replCoordSetMyLastAppliedOpTime(OpTime({0, 1}, term), Date_t::min() + Seconds(100)); // Advance commit point when it has the same term as the last applied. - repl->advanceCommitPoint(commitPoint, false); + replCoordAdvanceCommitPoint(commitPoint, Date_t::min() + Seconds(100), false); - replCoordSetMyLastAppliedOpTime(OpTime({1, 1}, term)); - replCoordSetMyLastAppliedOpTime(OpTime({1, 2}, term)); - replCoordSetMyLastAppliedOpTime(OpTime({1, 3}, term)); - replCoordSetMyLastAppliedOpTime(OpTime({1, 4}, term)); + replCoordSetMyLastAppliedOpTime(OpTime({1, 1}, term), Date_t::min() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(OpTime({1, 2}, term), Date_t::min() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(OpTime({1, 3}, term), Date_t::min() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(OpTime({1, 4}, term), Date_t::min() + Seconds(100)); // The stable timestamp should be equal to the commit point timestamp. const Timestamp stableTimestamp = getStorageInterface()->getStableTimestamp(); @@ -4207,7 +4281,8 @@ TEST_F(StableOpTimeTest, ClearOpTimeCandidatesPastCommonPointAfterRollback) { ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, opTimeCandidates); // Simulate a rollback to the common point. - getExternalState()->setLastOpTimeAndWallTime(rollbackCommonPoint); + getExternalState()->setLastOpTimeAndWallTime( + rollbackCommonPoint, Date_t::min() + Seconds(rollbackCommonPoint.getSecs())); repl->resetLastOpTimesFromOplog(opCtx.get(), ReplicationCoordinator::DataConsistency::Inconsistent); @@ -4237,14 +4312,16 @@ TEST_F(StableOpTimeTest, OpTimeCandidatesAreNotAddedWhenStateIsNotConsistent) { // Set the lastApplied optime forward when data is consistent, and check that it was added to // the candidate set. replCoordSetMyLastAppliedOpTimeForward(consistentOpTime, - ReplicationCoordinator::DataConsistency::Consistent); + ReplicationCoordinator::DataConsistency::Consistent, + Date_t::min() + Seconds(100)); ASSERT_EQUALS(consistentOpTime, repl->getMyLastAppliedOpTime()); ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, repl->getStableOpTimeCandidates_forTest()); // Set the lastApplied optime forward when data is not consistent, and check that it wasn't // added to the candidate set. replCoordSetMyLastAppliedOpTimeForward(inconsistentOpTime, - ReplicationCoordinator::DataConsistency::Inconsistent); + ReplicationCoordinator::DataConsistency::Inconsistent, + Date_t::min() + Seconds(100)); ASSERT_EQUALS(inconsistentOpTime, repl->getMyLastAppliedOpTime()); ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, repl->getStableOpTimeCandidates_forTest()); } @@ -4262,8 +4339,8 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin << 0))), HostAndPort("node1", 12345)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(10, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(10, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100)); auto opCtx = makeOperationContext(); @@ -4287,8 +4364,8 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte << 0))), HostAndPort("node1", 12345)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(10, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(10, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100)); const auto opCtx = makeOperationContext(); killOperation(opCtx.get()); @@ -4328,8 +4405,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi << 0))), HostAndPort("node1", 12345)); - replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); - replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100)); auto opCtx = makeOperationContext(); @@ -4352,8 +4429,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi OpTimeWithTermOne time(100, 1); - replCoordSetMyLastAppliedOpTime(time); - replCoordSetMyLastDurableOpTime(time); + replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100)); auto opCtx = makeOperationContext(); @@ -4400,8 +4477,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) { auto opCtx = makeOperationContext(); runSingleNodeElection(opCtx.get()); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(100)); shutdown(opCtx.get()); @@ -4425,8 +4502,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) { const auto opCtx = makeOperationContext(); runSingleNodeElection(opCtx.get()); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(100)); killOperation(opCtx.get()); auto status = getReplCoord()->waitUntilOpTimeForRead( opCtx.get(), @@ -4448,8 +4525,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) { auto opCtx = makeOperationContext(); runSingleNodeElection(opCtx.get()); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( opCtx.get(), @@ -4471,8 +4548,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) { runSingleNodeElection(opCtx.get()); OpTime time(Timestamp(100, 1), 1); - replCoordSetMyLastAppliedOpTime(time); - replCoordSetMyLastDurableOpTime(time); + replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( opCtx.get(), ReadConcernArgs(time, ReadConcernLevel::kMajorityReadConcern))); @@ -4492,13 +4569,13 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) { auto opCtx = makeOperationContext(); runSingleNodeElection(opCtx.get()); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100)); OpTime committedOpTime(Timestamp(200, 1), 1); auto pseudoLogOp = stdx::async(stdx::launch::async, [this, &committedOpTime]() { // Not guaranteed to be scheduled after waitUntil blocks... - replCoordSetMyLastAppliedOpTime(committedOpTime); - replCoordSetMyLastDurableOpTime(committedOpTime); + replCoordSetMyLastAppliedOpTime(committedOpTime, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(committedOpTime, Date_t::min() + Seconds(100)); }); ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( @@ -4519,15 +4596,15 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) { HostAndPort("node1", 12345)); auto opCtx = makeOperationContext(); runSingleNodeElection(opCtx.get()); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100)); OpTime opTimeToWait(Timestamp(100, 1), 1); auto pseudoLogOp = stdx::async(stdx::launch::async, [this, &opTimeToWait]() { // Not guaranteed to be scheduled after waitUntil blocks... - replCoordSetMyLastAppliedOpTime(opTimeToWait); - replCoordSetMyLastDurableOpTime(opTimeToWait); + replCoordSetMyLastAppliedOpTime(opTimeToWait, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(opTimeToWait, Date_t::min() + Seconds(100)); }); ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( @@ -4590,34 +4667,38 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); // lower configVersion - StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 0) << "t" << 2) - << "configVersion" - << 1 - << "primaryIndex" - << 2 - << "term" - << 2 - << "syncSourceIndex" - << 1))); + StatusWith<rpc::ReplSetMetadata> metadata = replReadFromMetadata(BSON( + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastCommittedWall" + << Date_t::min() + Seconds(100) + << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 2) + << "configVersion" + << 1 + << "primaryIndex" + << 2 + << "term" + << 2 + << "syncSourceIndex" + << 1))); getReplCoord()->processReplSetMetadata(metadata.getValue()); ASSERT_EQUALS(0, getReplCoord()->getTerm()); // higher configVersion - StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 0) << "t" << 2) - << "configVersion" - << 100 - << "primaryIndex" - << 2 - << "term" - << 2 - << "syncSourceIndex" - << 1))); + StatusWith<rpc::ReplSetMetadata> metadata2 = replReadFromMetadata(BSON( + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastCommittedWall" + << Date_t::min() + Seconds(100) + << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 2) + << "configVersion" + << 100 + << "primaryIndex" + << 2 + << "term" + << 2 + << "syncSourceIndex" + << 1))); getReplCoord()->processReplSetMetadata(metadata2.getValue()); ASSERT_EQUALS(0, getReplCoord()->getTerm()); } @@ -4653,16 +4734,19 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeIsNewer OpTime time(Timestamp(10, 1), 1); OpTime oldTime(Timestamp(9, 1), 1); - replCoordSetMyLastAppliedOpTime(time); + Date_t wallTime = Date_t::min() + Seconds(10); + replCoordSetMyLastAppliedOpTime(time, wallTime); // higher OpTime, should change - getReplCoord()->advanceCommitPoint(time, false); + getReplCoord()->advanceCommitPoint({time, wallTime}, false); ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime()); + ASSERT_EQUALS(wallTime, getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime); ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime()); // lower OpTime, should not change - getReplCoord()->advanceCommitPoint(oldTime, false); + getReplCoord()->advanceCommitPoint({oldTime, Date_t::min() + Seconds(5)}, false); ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime()); + ASSERT_EQUALS(wallTime, getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime); ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime()); } TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurrentPrimaryIndex) { @@ -4694,54 +4778,60 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr ASSERT_EQUALS(1, getReplCoord()->getTerm()); // higher term, should change - StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 0) << "t" << 3) - << "configVersion" - << 2 - << "primaryIndex" - << 2 - << "term" - << 3 - << "syncSourceIndex" - << 1))); + StatusWith<rpc::ReplSetMetadata> metadata = replReadFromMetadata(BSON( + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastCommittedWall" + << Date_t::min() + Seconds(100) + << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 3) + << "configVersion" + << 2 + << "primaryIndex" + << 2 + << "term" + << 3 + << "syncSourceIndex" + << 1))); getReplCoord()->processReplSetMetadata(metadata.getValue()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); // lower term, should not change - StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" - << BSON("ts" << Timestamp(11, 0) << "t" << 3) - << "configVersion" - << 2 - << "primaryIndex" - << 1 - << "term" - << 2 - << "syncSourceIndex" - << 1))); + StatusWith<rpc::ReplSetMetadata> metadata2 = replReadFromMetadata(BSON( + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastCommittedWall" + << Date_t::min() + Seconds(100) + << "lastOpVisible" + << BSON("ts" << Timestamp(11, 0) << "t" << 3) + << "configVersion" + << 2 + << "primaryIndex" + << 1 + << "term" + << 2 + << "syncSourceIndex" + << 1))); getReplCoord()->processReplSetMetadata(metadata2.getValue()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); // same term, should not change - StatusWith<rpc::ReplSetMetadata> metadata3 = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" - << BSON("ts" << Timestamp(11, 0) << "t" << 3) - << "configVersion" - << 2 - << "primaryIndex" - << 1 - << "term" - << 3 - << "syncSourceIndex" - << 1))); + StatusWith<rpc::ReplSetMetadata> metadata3 = replReadFromMetadata(BSON( + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastCommittedWall" + << Date_t::min() + Seconds(100) + << "lastOpVisible" + << BSON("ts" << Timestamp(11, 0) << "t" << 3) + << "configVersion" + << 2 + << "primaryIndex" + << 1 + << "term" + << 3 + << "syncSourceIndex" + << 1))); getReplCoord()->processReplSetMetadata(metadata3.getValue()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); @@ -4776,18 +4866,20 @@ TEST_F(ReplCoordTest, auto config = replCoord->getConfig(); // Higher term - should update term but not last committed optime. - StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 0) << "t" << 3) - << "configVersion" - << config.getConfigVersion() - << "primaryIndex" - << 1 - << "term" - << 3 - << "syncSourceIndex" - << 1))); + StatusWith<rpc::ReplSetMetadata> metadata = replReadFromMetadata(BSON( + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastCommittedWall" + << Date_t::min() + Seconds(100) + << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 3) + << "configVersion" + << config.getConfigVersion() + << "primaryIndex" + << 1 + << "term" + << 3 + << "syncSourceIndex" + << 1))); BSONObjBuilder responseBuilder; ASSERT_OK(metadata.getValue().writeToMetadata(&responseBuilder)); @@ -4839,16 +4931,17 @@ TEST_F(ReplCoordTest, LastCommittedOpTimeOnlyUpdatedFromHeartbeatWhenLastApplied auto opTime1 = OpTime({10, 1}, 1); auto opTime2 = OpTime({11, 1}, 2); // In higher term. auto commitPoint = OpTime({15, 1}, 2); - replCoordSetMyLastAppliedOpTime(opTime1); + replCoordSetMyLastAppliedOpTime(opTime1, Date_t::min() + Seconds(100)); // Node 1 is the current primary. The commit point has a higher term than lastApplied. - rpc::ReplSetMetadata metadata(2, // term - commitPoint, // committedOpTime - commitPoint, // visibleOpTime - config.getConfigVersion(), - {}, // replset id - 1, // currentPrimaryIndex, - 1); // currentSyncSourceIndex + rpc::ReplSetMetadata metadata( + 2, // term + {commitPoint, Date_t::min() + Seconds(commitPoint.getSecs())}, // committed OpTime + commitPoint, // visibleOpTime + config.getConfigVersion(), + {}, // replset id + 1, // currentPrimaryIndex, + 1); // currentSyncSourceIndex auto net = getNet(); BSONObjBuilder responseBuilder; @@ -4877,7 +4970,7 @@ TEST_F(ReplCoordTest, LastCommittedOpTimeOnlyUpdatedFromHeartbeatWhenLastApplied } // Update lastApplied, so commit point can be advanced. - replCoordSetMyLastAppliedOpTime(opTime2); + replCoordSetMyLastAppliedOpTime(opTime2, Date_t::min() + Seconds(100)); { net->enterNetwork(); net->runUntil(net->now() + config.getHeartbeatInterval()); @@ -4919,16 +5012,17 @@ TEST_F(ReplCoordTest, LastCommittedOpTimeOnlyUpdatedFromHeartbeatInFCV42) { auto lastAppliedOpTime = OpTime({11, 1}, 2); auto commitPoint = OpTime({15, 1}, 2); - replCoordSetMyLastAppliedOpTime(lastAppliedOpTime); + replCoordSetMyLastAppliedOpTime(lastAppliedOpTime, Date_t::min() + Seconds(100)); // Node 1 is the current primary. - rpc::ReplSetMetadata metadata(2, // term - commitPoint, // committedOpTime - commitPoint, // visibleOpTime - config.getConfigVersion(), - {}, // replset id - 1, // currentPrimaryIndex, - 1); // currentSyncSourceIndex + rpc::ReplSetMetadata metadata( + 2, // term + {commitPoint, Date_t::min() + Seconds(commitPoint.getSecs())}, // committed OpTime + commitPoint, // visibleOpTime + config.getConfigVersion(), + {}, // replset id + 1, // currentPrimaryIndex, + 1); // currentSyncSourceIndex auto net = getNet(); BSONObjBuilder responseBuilder; @@ -4993,15 +5087,15 @@ TEST_F(ReplCoordTest, AdvanceCommitPointFromSyncSourceCanSetCommitPointToLastApp HostAndPort("node1", 12345)); ASSERT_EQUALS(OpTime(), getReplCoord()->getLastCommittedOpTime()); - auto lastApplied = OpTime({10, 1}, 1); - auto commitPoint = OpTime({15, 1}, 2); - replCoordSetMyLastAppliedOpTime(lastApplied); + OpTimeAndWallTime lastApplied = {OpTime({10, 1}, 1), Date_t::min() + Seconds(10)}; + OpTimeAndWallTime commitPoint = {OpTime({15, 1}, 2), Date_t::min() + Seconds(15)}; + replCoordSetMyLastAppliedOpTime(lastApplied.opTime, lastApplied.wallTime); const bool fromSyncSource = true; getReplCoord()->advanceCommitPoint(commitPoint, fromSyncSource); // The commit point can be set to lastApplied, even though lastApplied is in a lower term. - ASSERT_EQUALS(lastApplied, getReplCoord()->getLastCommittedOpTime()); + ASSERT_EQUALS(lastApplied.opTime, getReplCoord()->getLastCommittedOpTime()); } TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) { @@ -5029,9 +5123,12 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) { OpTime optime1{Timestamp(10, 0), 5}; OpTime optime2{Timestamp(11, 2), 5}; + Date_t wallTime1 = Date_t::min() + Seconds(1); + Date_t wallTime2 = Date_t::min() + Seconds(2); - replCoordSetMyLastAppliedOpTime(optime2); - getReplCoord()->advanceCommitPoint(optime1, false); + replCoordSetMyLastAppliedOpTime(optime2, wallTime2); + // pass dummy Date_t to avoid advanceCommitPoint invariant + getReplCoord()->advanceCommitPoint({optime1, wallTime1}, false); auto opCtx = makeOperationContext(); @@ -5044,17 +5141,19 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) { BSONObj metadata = metadataBob.done(); log() << metadata; - auto oqMetadata = rpc::OplogQueryMetadata::readFromMetadata(metadata); + auto oqMetadata = rpc::OplogQueryMetadata::readFromMetadata(metadata, /*requireWallTime*/ true); ASSERT_OK(oqMetadata.getStatus()); - ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted(), optime1); + ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted().opTime, optime1); + ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted().wallTime, wallTime1); ASSERT_EQ(oqMetadata.getValue().getLastOpApplied(), optime2); ASSERT_EQ(oqMetadata.getValue().getRBID(), 100); ASSERT_EQ(oqMetadata.getValue().getSyncSourceIndex(), -1); ASSERT_EQ(oqMetadata.getValue().getPrimaryIndex(), -1); - auto replMetadata = rpc::ReplSetMetadata::readFromMetadata(metadata); + auto replMetadata = replReadFromMetadata(metadata); ASSERT_OK(replMetadata.getStatus()); - ASSERT_EQ(replMetadata.getValue().getLastOpCommitted(), optime1); + ASSERT_EQ(replMetadata.getValue().getLastOpCommitted().opTime, optime1); + ASSERT_EQ(replMetadata.getValue().getLastOpCommitted().wallTime, wallTime1); ASSERT_EQ(replMetadata.getValue().getLastOpVisible(), OpTime()); ASSERT_EQ(replMetadata.getValue().getConfigVersion(), 2); ASSERT_EQ(replMetadata.getValue().getTerm(), 0); @@ -5092,18 +5191,20 @@ TEST_F(ReplCoordTest, TermAndLastCommittedOpTimeUpdatedFromHeartbeatWhenArbiter) // Higher term - should update term and lastCommittedOpTime since arbiters learn of the // commit point via heartbeats. - StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 1) << "t" << 3) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 1) << "t" << 3) - << "configVersion" - << config.getConfigVersion() - << "primaryIndex" - << 1 - << "term" - << 3 - << "syncSourceIndex" - << 1))); + StatusWith<rpc::ReplSetMetadata> metadata = replReadFromMetadata(BSON( + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(10, 1) << "t" << 3) << "lastCommittedWall" + << Date_t::min() + Seconds(100) + << "lastOpVisible" + << BSON("ts" << Timestamp(10, 1) << "t" << 3) + << "configVersion" + << config.getConfigVersion() + << "primaryIndex" + << 1 + << "term" + << 3 + << "syncSourceIndex" + << 1))); BSONObjBuilder responseBuilder; ASSERT_OK(metadata.getValue().writeToMetadata(&responseBuilder)); @@ -5299,8 +5400,10 @@ TEST_F(ReplCoordTest, hbResp.setConfigVersion(3); hbResp.setSetName("mySet"); hbResp.setState(MemberState::RS_SECONDARY); - hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0)); + hbResp.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); + hbResp.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON())); net->runReadyNetworkOperations(); net->exitNetwork(); @@ -5353,8 +5456,10 @@ TEST_F(ReplCoordTest, hbResp.setSetName("mySet"); hbResp.setState(MemberState::RS_PRIMARY); hbResp.setTerm(replCoord->getTerm()); - hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0)); + hbResp.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); + hbResp.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)}); hbResp.setConfigVersion(1); // Heartbeat response is scheduled with a delay so that we can be sure that @@ -5497,17 +5602,17 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastAppliedOpTime(time5); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(time5, Date_t::min() + Seconds(100)); // ensure current snapshot follows price is right rules (closest but not greater than) - replCoordSetMyLastDurableOpTime(time3); + replCoordSetMyLastDurableOpTime(time3, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime()); - replCoordSetMyLastDurableOpTime(time4); + replCoordSetMyLastDurableOpTime(time4, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime()); - replCoordSetMyLastDurableOpTime(time5); + replCoordSetMyLastDurableOpTime(time5, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshotOpTime()); } @@ -5533,10 +5638,10 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) { OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); - replCoordSetMyLastAppliedOpTime(time1); - replCoordSetMyLastAppliedOpTime(time2); - replCoordSetMyLastAppliedOpTime(time5); - replCoordSetMyLastDurableOpTime(time5); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(time5, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time5, Date_t::min() + Seconds(100)); // ensure dropping all snapshots should reset the current committed snapshot getReplCoord()->dropAllSnapshots(); @@ -5561,11 +5666,11 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAppliedOpTimeChanges) { OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); - replCoordSetMyLastAppliedOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime()); - replCoordSetMyLastAppliedOpTime(time2); + replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100)); ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime()); - replCoordSetMyLastDurableOpTime(time2); + replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime()); } @@ -5589,12 +5694,12 @@ TEST_F(ReplCoordTest, OpTime time3(Timestamp(100, 3), term); auto consistency = ReplicationCoordinator::DataConsistency::Consistent; - replCoordSetMyLastAppliedOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime()); - replCoordSetMyLastAppliedOpTimeForward(time3, consistency); + replCoordSetMyLastAppliedOpTimeForward(time3, consistency, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time3, getReplCoord()->getMyLastAppliedOpTime()); - replCoordSetMyLastAppliedOpTimeForward(time2, consistency); - replCoordSetMyLastDurableOpTimeForward(time2); + replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTimeForward(time2, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time3, getReplCoord()->getMyLastAppliedOpTime()); } @@ -5617,11 +5722,11 @@ DEATH_TEST_F(ReplCoordTest, OpTime time2(Timestamp(99, 1), 2); auto consistency = ReplicationCoordinator::DataConsistency::Consistent; - replCoordSetMyLastAppliedOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime()); // Since in pv1, oplog entries are ordered by non-decreasing // term and strictly increasing timestamp, it leads to invariant failure. - replCoordSetMyLastAppliedOpTimeForward(time2, consistency); + replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100)); } DEATH_TEST_F(ReplCoordTest, @@ -5643,11 +5748,11 @@ DEATH_TEST_F(ReplCoordTest, OpTime time2(Timestamp(100, 1), 2); auto consistency = ReplicationCoordinator::DataConsistency::Consistent; - replCoordSetMyLastAppliedOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime()); // Since in pv1, oplog entries are ordered by non-decreasing // term and strictly increasing timestamp, it leads to invariant failure. - replCoordSetMyLastAppliedOpTimeForward(time2, consistency); + replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100)); } DEATH_TEST_F(ReplCoordTest, @@ -5669,11 +5774,11 @@ DEATH_TEST_F(ReplCoordTest, OpTime time2(Timestamp(100, 2), 0); auto consistency = ReplicationCoordinator::DataConsistency::Consistent; - replCoordSetMyLastAppliedOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime()); // Since in pv1, oplog entries are ordered by non-decreasing // term and strictly increasing timestamp, it leads to invariant failure. - replCoordSetMyLastAppliedOpTimeForward(time2, consistency); + replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100)); } DEATH_TEST_F(ReplCoordTest, @@ -5695,11 +5800,11 @@ DEATH_TEST_F(ReplCoordTest, OpTime time2(Timestamp(100, 1), 0); auto consistency = ReplicationCoordinator::DataConsistency::Consistent; - replCoordSetMyLastAppliedOpTime(time1); + replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100)); ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime()); // Since in pv1, oplog entries are ordered by non-decreasing // term and strictly increasing timestamp, it leads to invariant failure. - replCoordSetMyLastAppliedOpTimeForward(time2, consistency); + replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100)); } TEST_F(ReplCoordTest, OnlyForwardSyncProgressForOtherNodesWhenTheNodesAreBelievedToBeUp) { @@ -5721,8 +5826,8 @@ TEST_F(ReplCoordTest, OnlyForwardSyncProgressForOtherNodesWhenTheNodesAreBelieve << BSON("electionTimeoutMillis" << 2000 << "heartbeatIntervalMillis" << 40000)), HostAndPort("test1", 1234)); OpTime optime(Timestamp(100, 2), 0); - replCoordSetMyLastAppliedOpTime(optime); - replCoordSetMyLastDurableOpTime(optime); + replCoordSetMyLastAppliedOpTime(optime, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime, Date_t::min() + Seconds(100)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime)); ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(1, 1, optime)); @@ -5795,21 +5900,30 @@ TEST_F(ReplCoordTest, UpdatePositionCmdHasMetadata) { << BSON("electionTimeoutMillis" << 2000 << "heartbeatIntervalMillis" << 40000)), HostAndPort("test1", 1234)); OpTime optime(Timestamp(100, 2), 0); - replCoordSetMyLastAppliedOpTime(optime); - replCoordSetMyLastDurableOpTime(optime); + replCoordSetMyLastAppliedOpTime(optime, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(optime, Date_t::min() + Seconds(100)); auto opCtx = makeOperationContext(); - // Set last committed optime via metadata. - rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1); + // Set last committed optime via metadata. Pass dummy Date_t to avoid advanceCommitPoint + // invariant. + rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), + {optime, Date_t::min() + Seconds(optime.getSecs())}, + optime, + 1, + OID(), + -1, + 1); getReplCoord()->processReplSetMetadata(syncSourceMetadata); - getReplCoord()->advanceCommitPoint(optime, true); + // Pass dummy Date_t to avoid advanceCommitPoint invariant. + getReplCoord()->advanceCommitPoint({optime, Date_t::min() + Seconds(optime.getSecs())}, true); BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand()); - auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd)); + auto metadata = unittest::assertGet(replReadFromMetadata(cmd)); ASSERT_EQUALS(metadata.getTerm(), getReplCoord()->getTerm()); ASSERT_EQUALS(metadata.getLastOpVisible(), optime); - auto oqMetadataStatus = rpc::OplogQueryMetadata::readFromMetadata(cmd); + auto oqMetadataStatus = + rpc::OplogQueryMetadata::readFromMetadata(cmd, /*requireWallTime*/ true); ASSERT_EQUALS(oqMetadataStatus.getStatus(), ErrorCodes::NoSuchKey); } @@ -5847,47 +5961,64 @@ TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNod HostAndPort("node1", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); OpTime startingOpTime = OpTime(Timestamp(100, 1), 0); - replCoordSetMyLastAppliedOpTime(startingOpTime); - replCoordSetMyLastDurableOpTime(startingOpTime); + replCoordSetMyLastAppliedOpTime(startingOpTime, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(startingOpTime, Date_t::min() + Seconds(100)); // Receive notification that every node is up. UpdatePositionArgs args; - ASSERT_OK( - args.initialize(BSON(UpdatePositionArgs::kCommandFieldName - << 1 - << UpdatePositionArgs::kUpdateArrayFieldName - << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 1 - << UpdatePositionArgs::kAppliedOpTimeFieldName - << startingOpTime.toBSON() - << UpdatePositionArgs::kDurableOpTimeFieldName - << startingOpTime.toBSON()) - << BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 2 - << UpdatePositionArgs::kAppliedOpTimeFieldName - << startingOpTime.toBSON() - << UpdatePositionArgs::kDurableOpTimeFieldName - << startingOpTime.toBSON()) - << BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 3 - << UpdatePositionArgs::kAppliedOpTimeFieldName - << startingOpTime.toBSON() - << UpdatePositionArgs::kDurableOpTimeFieldName - << startingOpTime.toBSON()) - << BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 4 - << UpdatePositionArgs::kAppliedOpTimeFieldName - << startingOpTime.toBSON() - << UpdatePositionArgs::kDurableOpTimeFieldName - << startingOpTime.toBSON()))))); + ASSERT_OK(updatePositionArgsInitialize( + args, + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 1 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs()) + << UpdatePositionArgs::kDurableOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs())) + << BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 2 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs()) + << UpdatePositionArgs::kDurableOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs())) + << BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 3 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs()) + << UpdatePositionArgs::kDurableOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs())) + << BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 4 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs()) + << UpdatePositionArgs::kDurableOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs())))))); ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0)); // Become PRIMARY. @@ -5895,26 +6026,36 @@ TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNod // Keep two nodes alive via UpdatePosition. UpdatePositionArgs args1; - ASSERT_OK( - args1.initialize(BSON(UpdatePositionArgs::kCommandFieldName - << 1 - << UpdatePositionArgs::kUpdateArrayFieldName - << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 1 - << UpdatePositionArgs::kAppliedOpTimeFieldName - << startingOpTime.toBSON() - << UpdatePositionArgs::kDurableOpTimeFieldName - << startingOpTime.toBSON()) - << BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 2 - << UpdatePositionArgs::kAppliedOpTimeFieldName - << startingOpTime.toBSON() - << UpdatePositionArgs::kDurableOpTimeFieldName - << startingOpTime.toBSON()))))); + ASSERT_OK(updatePositionArgsInitialize( + args1, + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 1 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs()) + << UpdatePositionArgs::kDurableOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs())) + << BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 2 + << UpdatePositionArgs::kAppliedOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs()) + << UpdatePositionArgs::kDurableOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs())))), + /*requireWallTime*/ true)); const Date_t startDate = getNet()->now(); getNet()->enterNetwork(); getNet()->runUntil(startDate + Milliseconds(100)); @@ -5952,18 +6093,23 @@ TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNod // Keep one node alive via two methods (UpdatePosition and requestHeartbeat). UpdatePositionArgs args2; - ASSERT_OK( - args2.initialize(BSON(UpdatePositionArgs::kCommandFieldName - << 1 - << UpdatePositionArgs::kUpdateArrayFieldName - << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName - << 2 - << UpdatePositionArgs::kMemberIdFieldName - << 1 - << UpdatePositionArgs::kDurableOpTimeFieldName - << startingOpTime.toBSON() - << UpdatePositionArgs::kAppliedOpTimeFieldName - << startingOpTime.toBSON()))))); + ASSERT_OK(updatePositionArgsInitialize( + args2, + BSON(UpdatePositionArgs::kCommandFieldName + << 1 + << UpdatePositionArgs::kUpdateArrayFieldName + << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName + << 2 + << UpdatePositionArgs::kMemberIdFieldName + << 1 + << UpdatePositionArgs::kDurableOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kDurableWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs()) + << UpdatePositionArgs::kAppliedOpTimeFieldName + << startingOpTime.toBSON() + << UpdatePositionArgs::kAppliedWallTimeFieldName + << Date_t::min() + Seconds(startingOpTime.getSecs())))))); ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args2, 0)); hbArgs.setSetName("mySet"); @@ -6001,8 +6147,8 @@ TEST_F(ReplCoordTest, WaitForMemberState) { HostAndPort("test1", 1234)); auto replCoord = getReplCoord(); auto initialTerm = replCoord->getTerm(); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(100)); ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); // Single node cluster - this node should start election on setFollowerMode() completion. @@ -6038,8 +6184,8 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { HostAndPort("test1", 1234)); auto replCoord = getReplCoord(); auto initialTerm = replCoord->getTerm(); - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(100)); ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); // Single node cluster - this node should start election on setFollowerMode() completion. @@ -6184,8 +6330,8 @@ TEST_F(ReplCoordTest, NodeStoresElectionVotes) { HostAndPort("node1", 12345)); auto time = OpTimeWithTermOne(100, 1); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(time); - replCoordSetMyLastDurableOpTime(time); + replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); auto opCtx = makeOperationContext(); @@ -6238,8 +6384,8 @@ TEST_F(ReplCoordTest, NodeDoesNotStoreDryRunVotes) { HostAndPort("node1", 12345)); auto time = OpTimeWithTermOne(100, 1); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(time); - replCoordSetMyLastDurableOpTime(time); + replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); auto opCtx = makeOperationContext(); @@ -6290,8 +6436,8 @@ TEST_F(ReplCoordTest, NodeFailsVoteRequestIfItFailsToStoreLastVote) { HostAndPort("node1", 12345)); auto time = OpTimeWithTermOne(100, 1); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(time); - replCoordSetMyLastDurableOpTime(time); + replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); // Get our current term, as primary. @@ -6351,8 +6497,8 @@ TEST_F(ReplCoordTest, NodeNodesNotGrantVoteIfInTerminalShutdown) { HostAndPort("node1", 12345)); auto time = OpTimeWithTermOne(100, 1); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - replCoordSetMyLastAppliedOpTime(time); - replCoordSetMyLastDurableOpTime(time); + replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100)); + replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100)); simulateSuccessfulV1Election(); // Get our current term, as primary. diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 9f1ef793a58..79649d9b85f 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -325,8 +325,8 @@ void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result) void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {} -void ReplicationCoordinatorMock::advanceCommitPoint(const OpTime& committedOptime, - bool fromSyncSource) {} +void ReplicationCoordinatorMock::advanceCommitPoint( + const OpTimeAndWallTime& committedOptimeAndWallTime, bool fromSyncSource) {} void ReplicationCoordinatorMock::cancelAndRescheduleElectionTimeout() {} @@ -451,6 +451,10 @@ OpTime ReplicationCoordinatorMock::getLastCommittedOpTime() const { return OpTime(); } +OpTimeAndWallTime ReplicationCoordinatorMock::getLastCommittedOpTimeAndWallTime() const { + return {OpTime(), Date_t::min()}; +} + Status ReplicationCoordinatorMock::processReplSetRequestVotes( OperationContext* opCtx, const ReplSetRequestVotesArgs& args, diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 3389522810b..75052f1c8f7 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -189,7 +189,8 @@ public: virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override; - virtual void advanceCommitPoint(const OpTime& committedOptime, bool fromSyncSource) override; + virtual void advanceCommitPoint(const OpTimeAndWallTime& committedOptimeAndWallTime, + bool fromSyncSource) override; virtual void cancelAndRescheduleElectionTimeout() override; @@ -238,6 +239,8 @@ public: virtual OpTime getLastCommittedOpTime() const; + virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const; + virtual std::vector<MemberData> getMemberData() const override; virtual Status processReplSetRequestVotes(OperationContext* opCtx, diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index c788439d91d..f3d5cfb637f 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -231,8 +231,10 @@ void ReplCoordTest::simulateEnoughHeartbeatsForAllNodesUp() { hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); - hbResp.setAppliedOpTime(OpTime(Timestamp(100, 2), 0)); - hbResp.setDurableOpTime(OpTime(Timestamp(100, 2), 0)); + hbResp.setAppliedOpTimeAndWallTime( + {OpTime(Timestamp(100, 2), 0), Date_t::min() + Seconds(100)}); + hbResp.setDurableOpTimeAndWallTime( + {OpTime(Timestamp(100, 2), 0), Date_t::min() + Seconds(100)}); BSONObjBuilder respObj; net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON())); } else { @@ -334,8 +336,8 @@ void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t e hbResp.setState(MemberState::RS_SECONDARY); // The smallest valid optime in PV1. OpTime opTime(Timestamp(), 0); - hbResp.setAppliedOpTime(opTime); - hbResp.setDurableOpTime(opTime); + hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min() + Seconds(opTime.getSecs())}); + hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min() + Seconds(opTime.getSecs())}); hbResp.setConfigVersion(rsConfig.getConfigVersion()); net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON())); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") { @@ -387,8 +389,8 @@ void ReplCoordTest::signalDrainComplete(OperationContext* opCtx) { } void ReplCoordTest::runSingleNodeElection(OperationContext* opCtx) { - replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0)); - replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(1)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(1)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); getReplCoord()->waitForElectionFinish_forTest(); @@ -428,8 +430,10 @@ bool ReplCoordTest::consumeHeartbeatV1(const NetworkInterfaceMock::NetworkOperat hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); - hbResp.setAppliedOpTime(lastApplied); - hbResp.setDurableOpTime(lastApplied); + hbResp.setAppliedOpTimeAndWallTime( + {lastApplied, Date_t::min() + Seconds(lastApplied.getSecs())}); + hbResp.setDurableOpTimeAndWallTime( + {lastApplied, Date_t::min() + Seconds(lastApplied.getSecs())}); BSONObjBuilder respObj; net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON())); return true; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index db1406e1227..ab831e8820a 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -52,6 +52,7 @@ class ReplicationCoordinatorExternalStateMock; class ReplicationCoordinatorImpl; class StorageInterfaceMock; class TopologyCoordinator; +class UpdatePositionArgs; using executor::NetworkInterfaceMock; @@ -105,25 +106,57 @@ protected: return _repl.get(); } + Status updatePositionArgsInitialize(UpdatePositionArgs& args, + const BSONObj& argsObj, + bool requireWallTime = true) { + return args.initialize(argsObj, requireWallTime); + } + + StatusWith<rpc::ReplSetMetadata> replReadFromMetadata(const BSONObj& doc, + bool requireWallTime = true) { + return rpc::ReplSetMetadata::readFromMetadata(doc, requireWallTime); + } + void replCoordSetMyLastAppliedOpTime(const OpTime& opTime, Date_t wallTime = Date_t::min()) { + if (wallTime == Date_t::min()) { + wallTime = wallTime + Seconds(opTime.getSecs()); + } getReplCoord()->setMyLastAppliedOpTimeAndWallTime({opTime, wallTime}); } void replCoordSetMyLastAppliedOpTimeForward(const OpTime& opTime, ReplicationCoordinator::DataConsistency consistency, Date_t wallTime = Date_t::min()) { + if (wallTime == Date_t::min()) { + wallTime = wallTime + Seconds(opTime.getSecs()); + } getReplCoord()->setMyLastAppliedOpTimeAndWallTimeForward({opTime, wallTime}, consistency); } void replCoordSetMyLastDurableOpTime(const OpTime& opTime, Date_t wallTime = Date_t::min()) { + if (wallTime == Date_t::min()) { + wallTime = wallTime + Seconds(opTime.getSecs()); + } getReplCoord()->setMyLastDurableOpTimeAndWallTime({opTime, wallTime}); } void replCoordSetMyLastDurableOpTimeForward(const OpTime& opTime, Date_t wallTime = Date_t::min()) { + if (wallTime == Date_t::min()) { + wallTime = wallTime + Seconds(opTime.getSecs()); + } getReplCoord()->setMyLastDurableOpTimeAndWallTimeForward({opTime, wallTime}); } + void replCoordAdvanceCommitPoint(const OpTime& opTime, + Date_t wallTime = Date_t::min(), + bool fromSyncSource = false) { + if (wallTime == Date_t::min()) { + wallTime = wallTime + Seconds(opTime.getSecs()); + } + getReplCoord()->advanceCommitPoint({opTime, wallTime}, fromSyncSource); + } + /** * Gets the storage interface. */ diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp index 5e7c852d211..6ad2390d3c0 100644 --- a/src/mongo/db/repl/reporter.cpp +++ b/src/mongo/db/repl/reporter.cpp @@ -54,7 +54,7 @@ const char kConfigVersionFieldName[] = "configVersion"; template <typename UpdatePositionArgsType> long long _parseCommandRequestConfigVersion(const BSONObj& commandRequest) { UpdatePositionArgsType args; - if (!args.initialize(commandRequest).isOK()) { + if (!args.initialize(commandRequest, /*requireWallTime*/ false).isOK()) { return -1; } if (args.updatesBegin() == args.updatesEnd()) { diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index fab0a0821b7..0c28ff717fb 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -72,8 +72,12 @@ public: BSONObjBuilder entry(arrayBuilder.subobjStart()); itr.second.lastDurableOpTime.append(&entry, UpdatePositionArgs::kDurableOpTimeFieldName); + entry.appendDate(UpdatePositionArgs::kDurableWallTimeFieldName, + Date_t::min() + Seconds(itr.second.lastDurableOpTime.getSecs())); itr.second.lastAppliedOpTime.append(&entry, UpdatePositionArgs::kAppliedOpTimeFieldName); + entry.appendDate(UpdatePositionArgs::kAppliedWallTimeFieldName, + Date_t::min() + Seconds(itr.second.lastAppliedOpTime.getSecs())); entry.append(UpdatePositionArgs::kMemberIdFieldName, itr.first); if (_configVersion != -1) { entry.append(UpdatePositionArgs::kConfigVersionFieldName, _configVersion); diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 4b682455e06..31e0b8ea1a3 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -589,10 +589,10 @@ Status TopologyCoordinator::prepareHeartbeatResponseV1(Date_t now, response->setElectionTime(_electionTime); } - const OpTime lastOpApplied = getMyLastAppliedOpTime(); - const OpTime lastOpDurable = getMyLastDurableOpTime(); - response->setAppliedOpTime(lastOpApplied); - response->setDurableOpTime(lastOpDurable); + const OpTimeAndWallTime lastOpApplied = getMyLastAppliedOpTimeAndWallTime(); + const OpTimeAndWallTime lastOpDurable = getMyLastDurableOpTimeAndWallTime(); + response->setAppliedOpTimeAndWallTime(lastOpApplied); + response->setDurableOpTimeAndWallTime(lastOpDurable); if (_currentPrimaryIndex != -1) { response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId()); @@ -977,6 +977,7 @@ void TopologyCoordinator::setMyLastAppliedOpTimeAndWallTime(OpTimeAndWallTime op myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm || opTime.getTimestamp() > myLastAppliedOpTime.getTimestamp()); } + myMemberData.setLastAppliedOpTimeAndWallTime(opTimeAndWallTime, now); } @@ -1053,10 +1054,11 @@ StatusWith<bool> TopologyCoordinator::setLastOptime(const UpdatePositionArgs::Up << memberData->getLastDurableOpTime() << "; updating to optime " << args.appliedOpTime << " and durable through " << args.durableOpTime; - - bool advancedOpTime = memberData->advanceLastAppliedOpTime(args.appliedOpTime, now); - advancedOpTime = - memberData->advanceLastDurableOpTime(args.durableOpTime, now) || advancedOpTime; + bool advancedOpTime = memberData->advanceLastAppliedOpTimeAndWallTime( + {args.appliedOpTime, args.appliedWallTime}, now); + advancedOpTime = memberData->advanceLastDurableOpTimeAndWallTime( + {args.durableOpTime, args.durableWallTime}, now) || + advancedOpTime; return advancedOpTime; } @@ -1383,7 +1385,9 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex, ReplSetHeartbeatResponse hbResponse; hbResponse.setState(MemberState::RS_PRIMARY); hbResponse.setElectionTime(electionTime); - hbResponse.setAppliedOpTime(_memberData.at(primaryIndex).getHeartbeatAppliedOpTime()); + hbResponse.setAppliedOpTimeAndWallTime( + {_memberData.at(primaryIndex).getHeartbeatAppliedOpTime(), + Date_t::min() + Seconds(1)}); hbResponse.setSyncingTo(HostAndPort()); _memberData.at(primaryIndex) .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(), @@ -1592,12 +1596,16 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu // New optimes, to hold them all. BSONObjBuilder optimes; - _lastCommittedOpTime.append(&optimes, "lastCommittedOpTime"); + _lastCommittedOpTimeAndWallTime.opTime.append(&optimes, "lastCommittedOpTime"); + + if (_lastCommittedOpTimeAndWallTime.wallTime.isFormattable()) { + optimes.appendDate("lastCommittedWallTime", _lastCommittedOpTimeAndWallTime.wallTime); + } + if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) { rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime"); } - appendOpTime(&optimes, "appliedOpTime", lastOpApplied); appendOpTime(&optimes, "durableOpTime", lastOpDurable); @@ -1653,8 +1661,12 @@ StatusWith<BSONObj> TopologyCoordinator::prepareReplSetUpdatePositionCommand( BSONObjBuilder entry(arrayBuilder.subobjStart()); memberData.getLastDurableOpTime().append(&entry, UpdatePositionArgs::kDurableOpTimeFieldName); + entry.appendDate(UpdatePositionArgs::kDurableWallTimeFieldName, + memberData.getLastDurableWallTime()); memberData.getLastAppliedOpTime().append(&entry, UpdatePositionArgs::kAppliedOpTimeFieldName); + entry.appendDate(UpdatePositionArgs::kAppliedWallTimeFieldName, + memberData.getLastAppliedWallTime()); entry.append(UpdatePositionArgs::kMemberIdFieldName, memberData.getMemberId()); entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion()); } @@ -2381,7 +2393,7 @@ void TopologyCoordinator::_stepDownSelfAndReplaceWith(int newPrimary) { _setLeaderMode(LeaderMode::kNotLeader); } -bool TopologyCoordinator::updateLastCommittedOpTime() { +bool TopologyCoordinator::updateLastCommittedOpTimeAndWallTime() { // If we're not primary or we're stepping down due to learning of a new term then we must not // advance the commit point. If we are stepping down due to a user request, however, then it // is safe to advance the commit point, and in fact we must since the stepdown request may be @@ -2393,33 +2405,39 @@ bool TopologyCoordinator::updateLastCommittedOpTime() { // Whether we use the applied or durable OpTime for the commit point is decided here. const bool useDurableOpTime = _rsConfig.getWriteConcernMajorityShouldJournal(); - std::vector<OpTime> votingNodesOpTimes; + std::vector<OpTimeAndWallTime> votingNodesOpTimesAndWallTimes; for (const auto& memberData : _memberData) { int memberIndex = memberData.getConfigIndex(); invariant(memberIndex >= 0); const auto& memberConfig = _rsConfig.getMemberAt(memberIndex); if (memberConfig.isVoter()) { - const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime() - : memberData.getLastAppliedOpTime(); - votingNodesOpTimes.push_back(opTime); + const OpTimeAndWallTime durableOpTime = {memberData.getLastDurableOpTime(), + memberData.getLastDurableWallTime()}; + const OpTimeAndWallTime appliedOpTime = {memberData.getLastAppliedOpTime(), + memberData.getLastAppliedWallTime()}; + const OpTimeAndWallTime opTime = useDurableOpTime ? durableOpTime : appliedOpTime; + votingNodesOpTimesAndWallTimes.push_back(opTime); } } - invariant(votingNodesOpTimes.size() > 0); - if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) { + invariant(votingNodesOpTimesAndWallTimes.size() > 0); + if (votingNodesOpTimesAndWallTimes.size() < + static_cast<unsigned long>(_rsConfig.getWriteMajority())) { return false; } - std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end()); + std::sort(votingNodesOpTimesAndWallTimes.begin(), votingNodesOpTimesAndWallTimes.end()); // need the majority to have this OpTime - OpTime committedOpTime = - votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()]; + OpTimeAndWallTime committedOpTime = + votingNodesOpTimesAndWallTimes[votingNodesOpTimesAndWallTimes.size() - + _rsConfig.getWriteMajority()]; const bool fromSyncSource = false; - return advanceLastCommittedOpTime(committedOpTime, fromSyncSource); + return advanceLastCommittedOpTimeAndWallTime(committedOpTime, fromSyncSource); } -bool TopologyCoordinator::advanceLastCommittedOpTime(OpTime committedOpTime, bool fromSyncSource) { +bool TopologyCoordinator::advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime committedOpTime, + bool fromSyncSource) { if (_selfIndex == -1) { // The config hasn't been installed or we are not in the config. This could happen // on heartbeats before installing a config. @@ -2427,43 +2445,48 @@ bool TopologyCoordinator::advanceLastCommittedOpTime(OpTime committedOpTime, boo } // This check is performed to ensure primaries do not commit an OpTime from a previous term. - if (_iAmPrimary() && committedOpTime < _firstOpTimeOfMyTerm) { + if (_iAmPrimary() && committedOpTime.opTime < _firstOpTimeOfMyTerm) { LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: " - << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm; + << committedOpTime.opTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm; return false; } // Arbiters don't have data so they always advance their commit point via heartbeats. if (!_selfConfig().isArbiter() && - getMyLastAppliedOpTime().getTerm() != committedOpTime.getTerm()) { + getMyLastAppliedOpTime().getTerm() != committedOpTime.opTime.getTerm()) { if (fromSyncSource) { - committedOpTime = std::min(committedOpTime, getMyLastAppliedOpTime()); + committedOpTime = std::min(committedOpTime, getMyLastAppliedOpTimeAndWallTime()); } else { LOG(1) << "Ignoring commit point with different term than my lastApplied, since it " "may " "not be on the same oplog branch as mine. optime: " - << committedOpTime << ", my last applied: " << getMyLastAppliedOpTime(); + << committedOpTime + << ", my last applied: " << getMyLastAppliedOpTimeAndWallTime(); return false; } } - if (committedOpTime == _lastCommittedOpTime) { + if (committedOpTime.opTime == _lastCommittedOpTimeAndWallTime.opTime) { return false; // Hasn't changed, so ignore it. } - if (committedOpTime < _lastCommittedOpTime) { + if (committedOpTime.opTime < _lastCommittedOpTimeAndWallTime.opTime) { LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime - << ", currentCommittedOpTime: " << _lastCommittedOpTime; + << ", currentCommittedOpTime: " << _lastCommittedOpTimeAndWallTime; return false; } - LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime; - _lastCommittedOpTime = committedOpTime; + LOG(2) << "Updating _lastCommittedOpTimeAndWallTime to " << committedOpTime; + _lastCommittedOpTimeAndWallTime = committedOpTime; return true; } OpTime TopologyCoordinator::getLastCommittedOpTime() const { - return _lastCommittedOpTime; + return _lastCommittedOpTimeAndWallTime.opTime; +} + +OpTimeAndWallTime TopologyCoordinator::getLastCommittedOpTimeAndWallTime() const { + return _lastCommittedOpTimeAndWallTime; } bool TopologyCoordinator::canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const { @@ -2652,7 +2675,7 @@ bool TopologyCoordinator::shouldChangeSyncSource( rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata( const OpTime& lastVisibleOpTime) const { return rpc::ReplSetMetadata(_term, - _lastCommittedOpTime, + _lastCommittedOpTimeAndWallTime, lastVisibleOpTime, _rsConfig.getConfigVersion(), _rsConfig.getReplicaSetId(), @@ -2661,7 +2684,7 @@ rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata( } rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid) const { - return rpc::OplogQueryMetadata(_lastCommittedOpTime, + return rpc::OplogQueryMetadata(_lastCommittedOpTimeAndWallTime, getMyLastAppliedOpTime(), rbid, _currentPrimaryIndex, diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 9b96f613053..b462c9a3251 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -230,7 +230,7 @@ public: * the config getWriteConcernMajorityShouldJournal is set. * Returns true if the _lastCommittedOpTime was changed. */ - bool updateLastCommittedOpTime(); + bool updateLastCommittedOpTimeAndWallTime(); /** * Updates _lastCommittedOpTime to be 'committedOpTime' if it is more recent than the current @@ -239,13 +239,16 @@ public: * 'fromSyncSource'=true, which guarantees we are on the same branch of history as * 'committedOpTime', so we update our commit point to min(committedOpTime, lastApplied). */ - bool advanceLastCommittedOpTime(OpTime committedOpTime, bool fromSyncSource); + bool advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime committedOpTimeAndWallTime, + bool fromSyncSource); /** * Returns the OpTime of the latest majority-committed op known to this server. */ OpTime getLastCommittedOpTime() const; + OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const; + /** * Returns true if it's safe to transition to LeaderMode::kMaster. */ @@ -935,7 +938,7 @@ private: Date_t _electionSleepUntil; // OpTime of the latest committed operation. - OpTime _lastCommittedOpTime; + OpTimeAndWallTime _lastCommittedOpTimeAndWallTime; // OpTime representing our transition to PRIMARY and the start of our term. // _lastCommittedOpTime cannot be set to an earlier OpTime. diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index f7b9d212ecf..83ecf28486a 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -116,14 +116,20 @@ protected: ASSERT_OK(getTopoCoord().completeTransitionToPrimary(dummyOpTime)); } - void setMyOpTime(const OpTime& opTime, const Date_t wallTime = Date_t::min()) { + void setMyOpTime(const OpTime& opTime, Date_t wallTime = Date_t::min()) { + if (wallTime == Date_t::min()) { + wallTime = wallTime + Seconds(opTime.getSecs()); + } getTopoCoord().setMyLastAppliedOpTimeAndWallTime({opTime, wallTime}, now(), false); } void topoCoordSetMyLastAppliedOpTime(const OpTime& opTime, Date_t now, bool isRollbackAllowed, - const Date_t wallTime = Date_t::min()) { + Date_t wallTime = Date_t::min()) { + if (wallTime == Date_t::min()) { + wallTime = wallTime + Seconds(opTime.getSecs()); + } getTopoCoord().setMyLastAppliedOpTimeAndWallTime( {opTime, wallTime}, now, isRollbackAllowed); } @@ -131,11 +137,23 @@ protected: void topoCoordSetMyLastDurableOpTime(const OpTime& opTime, Date_t now, bool isRollbackAllowed, - const Date_t wallTime = Date_t::min()) { + Date_t wallTime = Date_t::min()) { + if (wallTime == Date_t::min()) { + wallTime = wallTime + Seconds(opTime.getSecs()); + } getTopoCoord().setMyLastDurableOpTimeAndWallTime( {opTime, wallTime}, now, isRollbackAllowed); } + void topoCoordAdvanceLastCommittedOpTime(const OpTime& opTime, + Date_t wallTime = Date_t::min(), + const bool fromSyncSource = false) { + if (wallTime == Date_t::min()) { + wallTime = wallTime + Seconds(opTime.getSecs()); + } + getTopoCoord().advanceLastCommittedOpTimeAndWallTime({opTime, wallTime}, fromSyncSource); + } + void setSelfMemberState(const MemberState& newState) { getTopoCoord().changeMemberState_forTest(newState); } @@ -189,7 +207,7 @@ protected: int primaryIndex = -1, int syncSourceIndex = -1) { return ReplSetMetadata(_topo->getTerm(), - OpTime(), + OpTimeAndWallTime(), visibleOpTime, _currentConfig.getConfigVersion(), OID(), @@ -201,8 +219,10 @@ protected: // Only set lastAppliedOpTime, primaryIndex and syncSourceIndex OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(), int primaryIndex = -1, - int syncSourceIndex = -1) { - return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, primaryIndex, syncSourceIndex); + int syncSourceIndex = -1, + Date_t lastCommittedWall = Date_t::min()) { + return OplogQueryMetadata( + {OpTime(), lastCommittedWall}, lastAppliedOpTime, -1, primaryIndex, syncSourceIndex); } HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, @@ -261,12 +281,20 @@ private: Timestamp electionTime, const OpTime& lastOpTimeSender, Milliseconds roundTripTime, - const HostAndPort& syncingTo) { + const HostAndPort& syncingTo, + Date_t lastDurableWallTime = Date_t::min(), + Date_t lastAppliedWallTime = Date_t::min()) { + if (lastDurableWallTime == Date_t::min()) { + lastDurableWallTime = lastDurableWallTime + Seconds(lastOpTimeSender.getSecs()); + } + if (lastAppliedWallTime == Date_t::min()) { + lastAppliedWallTime = lastAppliedWallTime + Seconds(lastOpTimeSender.getSecs()); + } ReplSetHeartbeatResponse hb; hb.setConfigVersion(1); hb.setState(memberState); - hb.setDurableOpTime(lastOpTimeSender); - hb.setAppliedOpTime(lastOpTimeSender); + hb.setDurableOpTimeAndWallTime({lastOpTimeSender, lastDurableWallTime}); + hb.setAppliedOpTimeAndWallTime({lastOpTimeSender, lastAppliedWallTime}); hb.setElectionTime(electionTime); hb.setTerm(getTopoCoord().getTerm()); hb.setSyncingTo(syncingTo); @@ -1599,8 +1627,12 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { Date_t curTime = heartbeatTime + uptimeSecs; Timestamp electionTime(1, 2); OpTime oplogProgress(Timestamp(3, 1), 20); + Date_t appliedWallTime = Date_t() + Seconds(oplogProgress.getSecs()); OpTime oplogDurable(Timestamp(1, 1), 19); + Date_t durableWallTime = Date_t() + Seconds(oplogDurable.getSecs()); + ; OpTime lastCommittedOpTime(Timestamp(5, 1), 20); + Date_t lastCommittedWallTime = Date_t() + Seconds(lastCommittedOpTime.getSecs()); OpTime readConcernMajorityOpTime(Timestamp(4, 1), 20); Timestamp lastStableRecoveryTimestamp(2, 2); Timestamp lastStableCheckpointTimestampDeprecated(2, 2); @@ -1611,8 +1643,8 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { hb.setConfigVersion(1); hb.setState(MemberState::RS_SECONDARY); hb.setElectionTime(electionTime); - hb.setDurableOpTime(oplogDurable); - hb.setAppliedOpTime(oplogProgress); + hb.setDurableOpTimeAndWallTime({oplogDurable, durableWallTime}); + hb.setAppliedOpTimeAndWallTime({oplogProgress, appliedWallTime}); StatusWith<ReplSetHeartbeatResponse> hbResponseGood = StatusWith<ReplSetHeartbeatResponse>(hb); updateConfig(BSON("_id" << setName << "version" << 1 << "members" @@ -1647,9 +1679,9 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { getTopoCoord().processHeartbeatResponse( heartbeatTime, Milliseconds(4000), member, hbResponseGood); makeSelfPrimary(electionTime); - topoCoordSetMyLastAppliedOpTime(oplogProgress, startupTime, false); - topoCoordSetMyLastDurableOpTime(oplogDurable, startupTime, false); - getTopoCoord().advanceLastCommittedOpTime(lastCommittedOpTime, false); + topoCoordSetMyLastAppliedOpTime(oplogProgress, startupTime, false, appliedWallTime); + topoCoordSetMyLastDurableOpTime(oplogDurable, startupTime, false, durableWallTime); + topoCoordAdvanceLastCommittedOpTime(lastCommittedOpTime, lastCommittedWallTime, false); // Now node 0 is down, node 1 is up, and for node 2 we have no heartbeat data yet. BSONObjBuilder statusBuilder; @@ -1681,8 +1713,11 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { ASSERT_BSONOBJ_EQ(readConcernMajorityOpTime.toBSON(), optimes["readConcernMajorityOpTime"].Obj()); ASSERT_BSONOBJ_EQ(oplogProgress.toBSON(), optimes["appliedOpTime"].Obj()); + ASSERT_EQUALS(appliedWallTime, optimes["lastAppliedWallTime"].Date()); ASSERT_BSONOBJ_EQ((oplogDurable).toBSON(), optimes["durableOpTime"].Obj()); + ASSERT_EQUALS(durableWallTime, optimes["lastDurableWallTime"].Date()); ASSERT_BSONOBJ_EQ(lastCommittedOpTime.toBSON(), optimes["lastCommittedOpTime"].Obj()); + ASSERT_EQUALS(lastCommittedWallTime, optimes["lastCommittedWallTime"].Date()); } std::vector<BSONElement> memberArray = rsStatus["members"].Array(); ASSERT_EQUALS(4U, memberArray.size()); @@ -3626,7 +3661,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeart TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) { // In this test, the TopologyCoordinator should tell us to change sync sources away from // "host4" since "host4" is absent from the config of version 10. - ReplSetMetadata replMetadata(0, OpTime(), OpTime(), 10, OID(), -1, -1); + ReplSetMetadata replMetadata(0, {OpTime(), Date_t::min()}, OpTime(), 10, OID(), -1, -1); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( HostAndPort("host4"), replMetadata, makeOplogQueryMetadata(), now())); } @@ -3665,9 +3700,12 @@ TEST_F(HeartbeatResponseTestV1, ReconfigNodeRemovedBetweenHeartbeatRequestAndRep 0); ReplSetHeartbeatResponse hb; - hb.initialize(BSON("ok" << 1 << "v" << 1 << "state" << MemberState::RS_PRIMARY), 0) + hb.initialize(BSON("ok" << 1 << "v" << 1 << "state" << MemberState::RS_PRIMARY), + 0, + /*requireWallTime*/ true) .transitional_ignore(); - hb.setDurableOpTime(lastOpTimeApplied); + hb.setDurableOpTimeAndWallTime( + {lastOpTimeApplied, Date_t::min() + Seconds(lastOpTimeApplied.getSecs())}); hb.setElectionTime(election.getTimestamp()); StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb); HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse( @@ -3713,15 +3751,21 @@ TEST_F(HeartbeatResponseTestV1, ReconfigBetweenHeartbeatRequestAndRepsonse) { ReplSetHeartbeatResponse hb; hb.initialize(BSON("ok" << 1 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() + << "durableWallTime" + << Date_t::min() + Seconds(100) << "opTime" << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" + << Date_t::min() + Seconds(100) << "v" << 1 << "state" << MemberState::RS_PRIMARY), - 0) + 0, + /*requireWallTime*/ true) .transitional_ignore(); - hb.setDurableOpTime(lastOpTimeApplied); + hb.setDurableOpTimeAndWallTime( + {lastOpTimeApplied, Date_t::min() + Seconds(lastOpTimeApplied.getSecs())}); hb.setElectionTime(election.getTimestamp()); StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb); topoCoordSetMyLastAppliedOpTime(lastOpTimeApplied, Date_t(), false); @@ -3882,14 +3926,16 @@ TEST_F(TopoCoordTest, FreshestNodeDoesCatchupTakeover) { setSelfMemberState(MemberState::RS_SECONDARY); OpTime currentOptime(Timestamp(200, 1), 0); + Date_t currentWallTime = Date_t::min() + Seconds(currentOptime.getSecs()); OpTime behindOptime(Timestamp(100, 1), 0); + Date_t behindWallTime = Date_t::min() + Seconds(behindOptime.getSecs()); // Create a mock heartbeat response to be able to compare who is the freshest node. // The latest heartbeat responses are looked at for determining the latest optime // and therefore freshness for catchup takeover. ReplSetHeartbeatResponse hbResp = ReplSetHeartbeatResponse(); hbResp.setState(MemberState::RS_SECONDARY); - hbResp.setAppliedOpTime(currentOptime); + hbResp.setAppliedOpTimeAndWallTime({currentOptime, currentWallTime}); hbResp.setTerm(1); Date_t firstRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T13:00Z")); @@ -3903,7 +3949,7 @@ TEST_F(TopoCoordTest, FreshestNodeDoesCatchupTakeover) { Milliseconds(999), HostAndPort("host3:27017"), StatusWith<ReplSetHeartbeatResponse>(hbResp)); - hbResp.setAppliedOpTime(behindOptime); + hbResp.setAppliedOpTimeAndWallTime({behindOptime, behindWallTime}); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), @@ -3937,12 +3983,14 @@ TEST_F(TopoCoordTest, StaleNodeDoesntDoCatchupTakeover) { setSelfMemberState(MemberState::RS_SECONDARY); OpTime currentOptime(Timestamp(200, 1), 0); + Date_t currentWallTime = Date_t::min() + Seconds(currentOptime.getSecs()); OpTime behindOptime(Timestamp(100, 1), 0); + Date_t behindWallTime = Date_t::min() + Seconds(behindOptime.getSecs()); // Create a mock heartbeat response to be able to compare who is the freshest node. ReplSetHeartbeatResponse hbResp = ReplSetHeartbeatResponse(); hbResp.setState(MemberState::RS_SECONDARY); - hbResp.setAppliedOpTime(currentOptime); + hbResp.setAppliedOpTimeAndWallTime({currentOptime, currentWallTime}); hbResp.setTerm(1); Date_t firstRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T13:00Z")); @@ -3956,7 +4004,7 @@ TEST_F(TopoCoordTest, StaleNodeDoesntDoCatchupTakeover) { Milliseconds(999), HostAndPort("host3:27017"), StatusWith<ReplSetHeartbeatResponse>(hbResp)); - hbResp.setAppliedOpTime(behindOptime); + hbResp.setAppliedOpTimeAndWallTime({behindOptime, behindWallTime}); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), @@ -3994,11 +4042,12 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverHeartbeatSaysPrimaryCaughtUp) { setSelfMemberState(MemberState::RS_SECONDARY); OpTime currentOptime(Timestamp(200, 1), 0); + Date_t currentWallTime = Date_t::min() + Seconds(currentOptime.getSecs()); // Create a mock heartbeat response to be able to compare who is the freshest node. ReplSetHeartbeatResponse hbResp = ReplSetHeartbeatResponse(); hbResp.setState(MemberState::RS_SECONDARY); - hbResp.setAppliedOpTime(currentOptime); + hbResp.setAppliedOpTimeAndWallTime({currentOptime, currentWallTime}); hbResp.setTerm(1); Date_t firstRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T13:00Z")); @@ -4050,11 +4099,13 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverIfTermNumbersSayPrimaryCaughtUp OpTime currentOptime(Timestamp(200, 1), 1); OpTime behindOptime(Timestamp(100, 1), 0); + Date_t currentWallTime = Date_t::min() + Seconds(currentOptime.getSecs()); + Date_t behindWallTime = Date_t::min() + Seconds(behindOptime.getSecs()); // Create a mock heartbeat response to be able to compare who is the freshest node. ReplSetHeartbeatResponse hbResp = ReplSetHeartbeatResponse(); hbResp.setState(MemberState::RS_SECONDARY); - hbResp.setAppliedOpTime(currentOptime); + hbResp.setAppliedOpTimeAndWallTime({currentOptime, currentWallTime}); hbResp.setTerm(1); Date_t firstRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T13:00Z")); @@ -4070,7 +4121,7 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverIfTermNumbersSayPrimaryCaughtUp Milliseconds(999), HostAndPort("host3:27017"), StatusWith<ReplSetHeartbeatResponse>(hbResp)); - hbResp.setAppliedOpTime(behindOptime); + hbResp.setAppliedOpTimeAndWallTime({behindOptime, behindWallTime}); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), diff --git a/src/mongo/db/repl/update_position_args.cpp b/src/mongo/db/repl/update_position_args.cpp index 1e164187844..70dad16f1ca 100644 --- a/src/mongo/db/repl/update_position_args.cpp +++ b/src/mongo/db/repl/update_position_args.cpp @@ -43,17 +43,26 @@ namespace repl { const char UpdatePositionArgs::kCommandFieldName[] = "replSetUpdatePosition"; const char UpdatePositionArgs::kUpdateArrayFieldName[] = "optimes"; const char UpdatePositionArgs::kAppliedOpTimeFieldName[] = "appliedOpTime"; +const char UpdatePositionArgs::kAppliedWallTimeFieldName[] = "appliedWallTime"; const char UpdatePositionArgs::kDurableOpTimeFieldName[] = "durableOpTime"; +const char UpdatePositionArgs::kDurableWallTimeFieldName[] = "durableWallTime"; const char UpdatePositionArgs::kMemberIdFieldName[] = "memberId"; const char UpdatePositionArgs::kConfigVersionFieldName[] = "cfgver"; UpdatePositionArgs::UpdateInfo::UpdateInfo(const OpTime& applied, + const Date_t& appliedWall, const OpTime& durable, + const Date_t& durableWall, long long aCfgver, long long aMemberId) - : appliedOpTime(applied), durableOpTime(durable), cfgver(aCfgver), memberId(aMemberId) {} - -Status UpdatePositionArgs::initialize(const BSONObj& argsObj) { + : appliedOpTime(applied), + appliedWallTime(appliedWall), + durableOpTime(durable), + durableWallTime(durableWall), + cfgver(aCfgver), + memberId(aMemberId) {} + +Status UpdatePositionArgs::initialize(const BSONObj& argsObj, bool requireWallTime) { // grab the array of changes BSONElement updateArray; Status status = bsonExtractTypedField(argsObj, kUpdateArrayFieldName, Array, &updateArray); @@ -70,6 +79,24 @@ Status UpdatePositionArgs::initialize(const BSONObj& argsObj) { if (!status.isOK()) return status; + Date_t appliedWallTime = Date_t::min(); + BSONElement appliedWallTimeElement; + status = bsonExtractTypedField( + entry, kAppliedWallTimeFieldName, BSONType::Date, &appliedWallTimeElement); + if (!status.isOK() && (status != ErrorCodes::NoSuchKey || requireWallTime)) + return status; + if (status.isOK()) + appliedWallTime = appliedWallTimeElement.Date(); + + Date_t durableWallTime = Date_t::min(); + BSONElement durableWallTimeElement; + status = bsonExtractTypedField( + entry, kDurableWallTimeFieldName, BSONType::Date, &durableWallTimeElement); + if (!status.isOK() && (status != ErrorCodes::NoSuchKey || requireWallTime)) + return status; + if (status.isOK()) + durableWallTime = durableWallTimeElement.Date(); + OpTime durableOpTime; status = bsonExtractOpTimeField(entry, kDurableOpTimeFieldName, &durableOpTime); if (!status.isOK()) @@ -87,7 +114,8 @@ Status UpdatePositionArgs::initialize(const BSONObj& argsObj) { if (!status.isOK()) return status; - _updates.push_back(UpdateInfo(appliedOpTime, durableOpTime, cfgver, memberID)); + _updates.push_back(UpdateInfo( + appliedOpTime, appliedWallTime, durableOpTime, durableWallTime, cfgver, memberID)); } return Status::OK(); diff --git a/src/mongo/db/repl/update_position_args.h b/src/mongo/db/repl/update_position_args.h index f260c53d02f..37c9a536eed 100644 --- a/src/mongo/db/repl/update_position_args.h +++ b/src/mongo/db/repl/update_position_args.h @@ -48,18 +48,24 @@ public: static const char kCommandFieldName[]; static const char kUpdateArrayFieldName[]; static const char kAppliedOpTimeFieldName[]; + static const char kAppliedWallTimeFieldName[]; static const char kDurableOpTimeFieldName[]; + static const char kDurableWallTimeFieldName[]; static const char kMemberIdFieldName[]; static const char kConfigVersionFieldName[]; struct UpdateInfo { UpdateInfo(const OpTime& applied, + const Date_t& appliedWall, const OpTime& durable, + const Date_t& durableWall, long long aCfgver, long long aMemberId); OpTime appliedOpTime; + Date_t appliedWallTime; OpTime durableOpTime; + Date_t durableWallTime; long long cfgver; long long memberId; }; @@ -69,7 +75,7 @@ public: /** * Initializes this UpdatePositionArgs from the contents of "argsObj". */ - Status initialize(const BSONObj& argsObj); + Status initialize(const BSONObj& argsObj, bool requireWallTime); /** * Gets a begin iterator over the UpdateInfos stored in this UpdatePositionArgs. diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp index bba03ff5688..3bc08634541 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.cpp +++ b/src/mongo/embedded/replication_coordinator_embedded.cpp @@ -400,7 +400,8 @@ bool ReplicationCoordinatorEmbedded::shouldChangeSyncSource( UASSERT_NOT_IMPLEMENTED; } -void ReplicationCoordinatorEmbedded::advanceCommitPoint(const OpTime&, bool fromSyncSource) { +void ReplicationCoordinatorEmbedded::advanceCommitPoint(const OpTimeAndWallTime&, + bool fromSyncSource) { UASSERT_NOT_IMPLEMENTED; } @@ -408,6 +409,10 @@ OpTime ReplicationCoordinatorEmbedded::getLastCommittedOpTime() const { UASSERT_NOT_IMPLEMENTED; } +OpTimeAndWallTime ReplicationCoordinatorEmbedded::getLastCommittedOpTimeAndWallTime() const { + UASSERT_NOT_IMPLEMENTED; +} + Status ReplicationCoordinatorEmbedded::processReplSetRequestVotes(OperationContext*, const ReplSetRequestVotesArgs&, ReplSetRequestVotesResponse*) { diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h index 7501d0a29dc..b695b6b7ea0 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.h +++ b/src/mongo/embedded/replication_coordinator_embedded.h @@ -175,7 +175,8 @@ public: void processReplSetMetadata(const rpc::ReplSetMetadata&) override; - void advanceCommitPoint(const repl::OpTime& committedOpTime, bool fromSyncSource) override; + void advanceCommitPoint(const repl::OpTimeAndWallTime& committedOpTimeAndWallTime, + bool fromSyncSource) override; void cancelAndRescheduleElectionTimeout() override; @@ -212,6 +213,8 @@ public: repl::OpTime getLastCommittedOpTime() const override; + repl::OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override; + Status processReplSetRequestVotes(OperationContext*, const repl::ReplSetRequestVotesArgs&, repl::ReplSetRequestVotesResponse*) override; diff --git a/src/mongo/rpc/metadata/oplog_query_metadata.cpp b/src/mongo/rpc/metadata/oplog_query_metadata.cpp index 59d19207f20..4548928d1ca 100644 --- a/src/mongo/rpc/metadata/oplog_query_metadata.cpp +++ b/src/mongo/rpc/metadata/oplog_query_metadata.cpp @@ -39,12 +39,14 @@ namespace mongo { namespace rpc { using repl::OpTime; +using repl::OpTimeAndWallTime; const char kOplogQueryMetadataFieldName[] = "$oplogQueryData"; namespace { const char kLastOpCommittedFieldName[] = "lastOpCommitted"; +const char kLastCommittedWallFieldName[] = "lastCommittedWall"; const char kLastOpAppliedFieldName[] = "lastOpApplied"; const char kPrimaryIndexFieldName[] = "primaryIndex"; const char kSyncSourceIndexFieldName[] = "syncSourceIndex"; @@ -54,7 +56,7 @@ const char kRBIDFieldName[] = "rbid"; const int OplogQueryMetadata::kNoPrimary; -OplogQueryMetadata::OplogQueryMetadata(OpTime lastOpCommitted, +OplogQueryMetadata::OplogQueryMetadata(OpTimeAndWallTime lastOpCommitted, OpTime lastOpApplied, int rbid, int currentPrimaryIndex, @@ -65,7 +67,8 @@ OplogQueryMetadata::OplogQueryMetadata(OpTime lastOpCommitted, _currentPrimaryIndex(currentPrimaryIndex), _currentSyncSourceIndex(currentSyncSourceIndex) {} -StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONObj& metadataObj) { +StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONObj& metadataObj, + bool requireWallTime) { BSONElement oqMetadataElement; Status status = bsonExtractTypedField( @@ -89,11 +92,20 @@ StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONOb if (!status.isOK()) return status; - repl::OpTime lastOpCommitted; - status = bsonExtractOpTimeField(oqMetadataObj, kLastOpCommittedFieldName, &lastOpCommitted); + repl::OpTimeAndWallTime lastOpCommitted; + status = + bsonExtractOpTimeField(oqMetadataObj, kLastOpCommittedFieldName, &(lastOpCommitted.opTime)); if (!status.isOK()) return status; + BSONElement wallClockTimeElement; + status = bsonExtractTypedField( + oqMetadataObj, kLastCommittedWallFieldName, BSONType::Date, &wallClockTimeElement); + if (!status.isOK() && (status != ErrorCodes::NoSuchKey || requireWallTime)) + return status; + if (status.isOK()) + lastOpCommitted.wallTime = wallClockTimeElement.Date(); + repl::OpTime lastOpApplied; status = bsonExtractOpTimeField(oqMetadataObj, kLastOpAppliedFieldName, &lastOpApplied); if (!status.isOK()) @@ -104,7 +116,8 @@ StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONOb Status OplogQueryMetadata::writeToMetadata(BSONObjBuilder* builder) const { BSONObjBuilder oqMetadataBuilder(builder->subobjStart(kOplogQueryMetadataFieldName)); - _lastOpCommitted.append(&oqMetadataBuilder, kLastOpCommittedFieldName); + _lastOpCommitted.opTime.append(&oqMetadataBuilder, kLastOpCommittedFieldName); + oqMetadataBuilder.appendDate(kLastCommittedWallFieldName, _lastOpCommitted.wallTime); _lastOpApplied.append(&oqMetadataBuilder, kLastOpAppliedFieldName); oqMetadataBuilder.append(kRBIDFieldName, _rbid); oqMetadataBuilder.append(kPrimaryIndexFieldName, _currentPrimaryIndex); diff --git a/src/mongo/rpc/metadata/oplog_query_metadata.h b/src/mongo/rpc/metadata/oplog_query_metadata.h index d9a830bcf67..1f8cc2705be 100644 --- a/src/mongo/rpc/metadata/oplog_query_metadata.h +++ b/src/mongo/rpc/metadata/oplog_query_metadata.h @@ -53,7 +53,7 @@ public: static const int kNoPrimary = -1; OplogQueryMetadata() = default; - OplogQueryMetadata(repl::OpTime lastOpCommitted, + OplogQueryMetadata(repl::OpTimeAndWallTime lastOpCommitted, repl::OpTime lastOpApplied, int rbid, int currentPrimaryIndex, @@ -63,19 +63,23 @@ public: * format: * { * lastOpCommitted: {ts: Timestamp(0, 0), term: 0}, + * lastCommittedWall: ISODate("2018-07-25T19:21:22.449Z") * lastOpApplied: {ts: Timestamp(0, 0), term: 0}, * rbid: 0 * primaryIndex: 0, * syncSourceIndex: 0 * } + * requireWallTime is only false if FCV is less than 4.2 or the wall clock time is not read from + * this particular OplogQueryMetadata instance. */ - static StatusWith<OplogQueryMetadata> readFromMetadata(const BSONObj& doc); + static StatusWith<OplogQueryMetadata> readFromMetadata(const BSONObj& doc, + bool requireWallTime); Status writeToMetadata(BSONObjBuilder* builder) const; /** * Returns the OpTime of the most recently committed op of which the sender was aware. */ - repl::OpTime getLastOpCommitted() const { + repl::OpTimeAndWallTime getLastOpCommitted() const { return _lastOpCommitted; } @@ -115,7 +119,7 @@ public: std::string toString() const; private: - repl::OpTime _lastOpCommitted; + repl::OpTimeAndWallTime _lastOpCommitted; repl::OpTime _lastOpApplied; int _rbid = -1; int _currentPrimaryIndex = kNoPrimary; diff --git a/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp b/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp index fb73b359c17..859e8bbf10c 100644 --- a/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp +++ b/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp @@ -41,10 +41,11 @@ using repl::OpTime; TEST(ReplResponseMetadataTest, OplogQueryMetadataRoundtrip) { OpTime opTime1(Timestamp(1234, 100), 5); + Date_t committedWall = Date_t::min() + Seconds(opTime1.getSecs()); OpTime opTime2(Timestamp(7777, 101), 6); - OplogQueryMetadata metadata(opTime1, opTime2, 6, 12, -1); + OplogQueryMetadata metadata({opTime1, committedWall}, opTime2, 6, 12, -1); - ASSERT_EQ(opTime1, metadata.getLastOpCommitted()); + ASSERT_EQ(opTime1, metadata.getLastOpCommitted().opTime); ASSERT_EQ(opTime2, metadata.getLastOpApplied()); BSONObjBuilder builder; @@ -53,6 +54,8 @@ TEST(ReplResponseMetadataTest, OplogQueryMetadataRoundtrip) { BSONObj expectedObj(BSON(kOplogQueryMetadataFieldName << BSON( "lastOpCommitted" << BSON("ts" << opTime1.getTimestamp() << "t" << opTime1.getTerm()) + << "lastCommittedWall" + << committedWall << "lastOpApplied" << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm()) << "rbid" @@ -65,12 +68,14 @@ TEST(ReplResponseMetadataTest, OplogQueryMetadataRoundtrip) { BSONObj serializedObj = builder.obj(); ASSERT_BSONOBJ_EQ(expectedObj, serializedObj); - auto cloneStatus = OplogQueryMetadata::readFromMetadata(serializedObj); + auto cloneStatus = + OplogQueryMetadata::readFromMetadata(serializedObj, /*requireWallTime*/ true); ASSERT_OK(cloneStatus.getStatus()); const auto& clonedMetadata = cloneStatus.getValue(); - ASSERT_EQ(opTime1, clonedMetadata.getLastOpCommitted()); + ASSERT_EQ(opTime1, clonedMetadata.getLastOpCommitted().opTime); ASSERT_EQ(opTime2, clonedMetadata.getLastOpApplied()); + ASSERT_EQ(committedWall, clonedMetadata.getLastOpCommitted().wallTime); ASSERT_EQ(metadata.getRBID(), clonedMetadata.getRBID()); ASSERT_EQ(metadata.getPrimaryIndex(), clonedMetadata.getPrimaryIndex()); ASSERT_EQ(metadata.getSyncSourceIndex(), clonedMetadata.getSyncSourceIndex()); diff --git a/src/mongo/rpc/metadata/repl_set_metadata.cpp b/src/mongo/rpc/metadata/repl_set_metadata.cpp index 4064bc498c6..12cd36df089 100644 --- a/src/mongo/rpc/metadata/repl_set_metadata.cpp +++ b/src/mongo/rpc/metadata/repl_set_metadata.cpp @@ -39,12 +39,14 @@ namespace mongo { namespace rpc { using repl::OpTime; +using repl::OpTimeAndWallTime; const char kReplSetMetadataFieldName[] = "$replData"; namespace { const char kLastOpCommittedFieldName[] = "lastOpCommitted"; +const char kLastCommittedWallFieldName[] = "lastCommittedWall"; const char kLastOpVisibleFieldName[] = "lastOpVisible"; const char kConfigVersionFieldName[] = "configVersion"; const char kReplicaSetIdFieldName[] = "replicaSetId"; @@ -57,7 +59,7 @@ const char kTermFieldName[] = "term"; const int ReplSetMetadata::kNoPrimary; ReplSetMetadata::ReplSetMetadata(long long term, - OpTime committedOpTime, + OpTimeAndWallTime committedOpTime, OpTime visibleOpTime, long long configVersion, OID id, @@ -71,7 +73,8 @@ ReplSetMetadata::ReplSetMetadata(long long term, _currentPrimaryIndex(currentPrimaryIndex), _currentSyncSourceIndex(currentSyncSourceIndex) {} -StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& metadataObj) { +StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& metadataObj, + bool requireWallTime) { BSONElement replMetadataElement; Status status = bsonExtractTypedField( @@ -108,17 +111,32 @@ StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& met if (!status.isOK()) return status; - repl::OpTime lastOpCommitted; - status = bsonExtractOpTimeField(replMetadataObj, kLastOpCommittedFieldName, &lastOpCommitted); + repl::OpTimeAndWallTime lastOpCommitted; + auto lastCommittedStatus = bsonExtractOpTimeField( + replMetadataObj, kLastOpCommittedFieldName, &(lastOpCommitted.opTime)); // We check for NoSuchKey because these fields will be removed in SERVER-27668. - if (!status.isOK() && status != ErrorCodes::NoSuchKey) - return status; + if (!lastCommittedStatus.isOK() && lastCommittedStatus != ErrorCodes::NoSuchKey) + return lastCommittedStatus; repl::OpTime lastOpVisible; status = bsonExtractOpTimeField(replMetadataObj, kLastOpVisibleFieldName, &lastOpVisible); if (!status.isOK() && status != ErrorCodes::NoSuchKey) return status; + BSONElement wallClockTimeElement; + status = bsonExtractTypedField( + replMetadataObj, kLastCommittedWallFieldName, BSONType::Date, &wallClockTimeElement); + + // Last committed OpTime is optional, so if last committed OpTime is missing, do not check for + // last committed wall clock time. Last committed wall clock time is also only required if + // FCV is 4.2. + if (!status.isOK() && lastCommittedStatus != ErrorCodes::NoSuchKey && + (status != ErrorCodes::NoSuchKey || requireWallTime)) + return status; + if (status.isOK()) { + lastOpCommitted.wallTime = wallClockTimeElement.Date(); + } + return ReplSetMetadata( term, lastOpCommitted, lastOpVisible, configVersion, id, primaryIndex, syncSourceIndex); } @@ -126,7 +144,8 @@ StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& met Status ReplSetMetadata::writeToMetadata(BSONObjBuilder* builder) const { BSONObjBuilder replMetadataBuilder(builder->subobjStart(kReplSetMetadataFieldName)); replMetadataBuilder.append(kTermFieldName, _currentTerm); - _lastOpCommitted.append(&replMetadataBuilder, kLastOpCommittedFieldName); + _lastOpCommitted.opTime.append(&replMetadataBuilder, kLastOpCommittedFieldName); + replMetadataBuilder.appendDate(kLastCommittedWallFieldName, _lastOpCommitted.wallTime); _lastOpVisible.append(&replMetadataBuilder, kLastOpVisibleFieldName); replMetadataBuilder.append(kConfigVersionFieldName, _configVersion); replMetadataBuilder.append(kReplicaSetIdFieldName, _replicaSetId); diff --git a/src/mongo/rpc/metadata/repl_set_metadata.h b/src/mongo/rpc/metadata/repl_set_metadata.h index 927b559ee82..7559120ac0c 100644 --- a/src/mongo/rpc/metadata/repl_set_metadata.h +++ b/src/mongo/rpc/metadata/repl_set_metadata.h @@ -54,7 +54,7 @@ public: ReplSetMetadata() = default; ReplSetMetadata(long long term, - repl::OpTime committedOpTime, + repl::OpTimeAndWallTime committedOpTime, repl::OpTime visibleOpTime, long long configVersion, OID replicaSetId, @@ -72,8 +72,10 @@ public: * primaryIndex: 0, * syncSourceIndex: 0 * } + * requireWallTime is only false if FCV is less than 4.2 or the wall clock time is not read from + * this particular ReplSetMetadata instance. */ - static StatusWith<ReplSetMetadata> readFromMetadata(const BSONObj& doc); + static StatusWith<ReplSetMetadata> readFromMetadata(const BSONObj& doc, bool requireWallTime); Status writeToMetadata(BSONObjBuilder* builder) const; /** @@ -86,7 +88,7 @@ public: /** * Returns the OpTime of the most recently committed op of which the sender was aware. */ - repl::OpTime getLastOpCommitted() const { + repl::OpTimeAndWallTime getLastOpCommitted() const { return _lastOpCommitted; } @@ -140,7 +142,7 @@ public: std::string toString() const; private: - repl::OpTime _lastOpCommitted; + repl::OpTimeAndWallTime _lastOpCommitted; repl::OpTime _lastOpVisible; long long _currentTerm = -1; long long _configVersion = -1; diff --git a/src/mongo/rpc/metadata/repl_set_metadata_test.cpp b/src/mongo/rpc/metadata/repl_set_metadata_test.cpp index 1baa1000dd3..36e447889ec 100644 --- a/src/mongo/rpc/metadata/repl_set_metadata_test.cpp +++ b/src/mongo/rpc/metadata/repl_set_metadata_test.cpp @@ -38,17 +38,21 @@ namespace rpc { namespace { using repl::OpTime; +using repl::OpTimeAndWallTime; TEST(ReplResponseMetadataTest, ReplicaSetIdNotSet) { - ASSERT_FALSE(ReplSetMetadata(3, OpTime(), OpTime(), 6, OID(), 12, -1).hasReplicaSetId()); + ASSERT_FALSE( + ReplSetMetadata(3, OpTimeAndWallTime(), OpTime(), 6, OID(), 12, -1).hasReplicaSetId()); } TEST(ReplResponseMetadataTest, Roundtrip) { OpTime opTime(Timestamp(1234, 100), 5); OpTime opTime2(Timestamp(7777, 100), 6); - ReplSetMetadata metadata(3, opTime, opTime2, 6, OID::gen(), 12, -1); + Date_t committedWallTime = Date_t::min() + Seconds(opTime.getSecs()); + ReplSetMetadata metadata(3, {opTime, committedWallTime}, opTime2, 6, OID::gen(), 12, -1); - ASSERT_EQ(opTime, metadata.getLastOpCommitted()); + ASSERT_EQ(opTime, metadata.getLastOpCommitted().opTime); + ASSERT_EQ(committedWallTime, metadata.getLastOpCommitted().wallTime); ASSERT_EQ(opTime2, metadata.getLastOpVisible()); ASSERT_TRUE(metadata.hasReplicaSetId()); @@ -59,6 +63,8 @@ TEST(ReplResponseMetadataTest, Roundtrip) { BSON(kReplSetMetadataFieldName << BSON("term" << 3 << "lastOpCommitted" << BSON("ts" << opTime.getTimestamp() << "t" << opTime.getTerm()) + << "lastCommittedWall" + << committedWallTime << "lastOpVisible" << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm()) << "configVersion" @@ -73,12 +79,13 @@ TEST(ReplResponseMetadataTest, Roundtrip) { BSONObj serializedObj = builder.obj(); ASSERT_BSONOBJ_EQ(expectedObj, serializedObj); - auto cloneStatus = ReplSetMetadata::readFromMetadata(serializedObj); + auto cloneStatus = ReplSetMetadata::readFromMetadata(serializedObj, /*requireWallTime*/ true); ASSERT_OK(cloneStatus.getStatus()); const auto& clonedMetadata = cloneStatus.getValue(); - ASSERT_EQ(opTime, clonedMetadata.getLastOpCommitted()); + ASSERT_EQ(opTime, clonedMetadata.getLastOpCommitted().opTime); ASSERT_EQ(opTime2, clonedMetadata.getLastOpVisible()); + ASSERT_EQ(committedWallTime, clonedMetadata.getLastOpCommitted().wallTime); ASSERT_EQ(metadata.getConfigVersion(), clonedMetadata.getConfigVersion()); ASSERT_EQ(metadata.getReplicaSetId(), clonedMetadata.getReplicaSetId()); @@ -94,7 +101,7 @@ TEST(ReplResponseMetadataTest, MetadataCanBeConstructedWhenMissingOplogQueryMeta BSONObj obj(BSON(kReplSetMetadataFieldName << BSON("term" << 3 << "configVersion" << 6 << "replicaSetId" << id))); - auto status = ReplSetMetadata::readFromMetadata(obj); + auto status = ReplSetMetadata::readFromMetadata(obj, /*requireWallTime*/ true); ASSERT_OK(status.getStatus()); const auto& metadata = status.getValue(); diff --git a/src/mongo/s/catalog/sharding_catalog_test.cpp b/src/mongo/s/catalog/sharding_catalog_test.cpp index 26e1255306c..a9d0959a61c 100644 --- a/src/mongo/s/catalog/sharding_catalog_test.cpp +++ b/src/mongo/s/catalog/sharding_catalog_test.cpp @@ -119,7 +119,13 @@ TEST_F(ShardingCatalogClientTest, GetCollectionExisting) { checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1); + ReplSetMetadata metadata(10, + {newOpTime, Date_t::min() + Seconds(newOpTime.getSecs())}, + newOpTime, + 100, + OID(), + 30, + -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder).transitional_ignore(); @@ -186,7 +192,13 @@ TEST_F(ShardingCatalogClientTest, GetDatabaseExisting) { checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1); + ReplSetMetadata metadata(10, + {newOpTime, Date_t::min() + Seconds(newOpTime.getSecs())}, + newOpTime, + 100, + OID(), + 30, + -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder).transitional_ignore(); @@ -405,7 +417,13 @@ TEST_F(ShardingCatalogClientTest, GetChunksForNSWithSortAndLimit) { checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1); + ReplSetMetadata metadata(10, + {newOpTime, Date_t::min() + Seconds(newOpTime.getSecs())}, + newOpTime, + 100, + OID(), + 30, + -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder).transitional_ignore(); @@ -806,7 +824,13 @@ TEST_F(ShardingCatalogClientTest, GetCollectionsValidResultsNoDb) { checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1); + ReplSetMetadata metadata(10, + {newOpTime, Date_t::min() + Seconds(newOpTime.getSecs())}, + newOpTime, + 100, + OID(), + 30, + -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder).transitional_ignore(); diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 84275dcd102..5e5b5b3ecb9 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -272,8 +272,9 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand( const auto& data = dataStatus.getValue(); if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) { - auto replParseStatus = - rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata); + // Sharding users of ReplSetMetadata do not require the wall clock time field to be set + auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata( + data.otherFields.metadata, /*requireWallTime*/ false); if (!replParseStatus.isOK()) { status = replParseStatus.getStatus(); response.docs.clear(); @@ -281,7 +282,7 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand( } const auto& replSetMetadata = replParseStatus.getValue(); - response.opTime = replSetMetadata.getLastOpCommitted(); + response.opTime = replSetMetadata.getLastOpCommitted().opTime; } for (const BSONObj& doc : data.documents) { diff --git a/src/mongo/s/sharding_egress_metadata_hook.cpp b/src/mongo/s/sharding_egress_metadata_hook.cpp index faea59f04dd..0bafa375784 100644 --- a/src/mongo/s/sharding_egress_metadata_hook.cpp +++ b/src/mongo/s/sharding_egress_metadata_hook.cpp @@ -90,7 +90,9 @@ Status ShardingEgressMetadataHook::_advanceConfigOptimeFromShard(ShardId shardId if (shard->isConfig()) { // Config servers return the config opTime as part of their own metadata. if (metadataObj.hasField(rpc::kReplSetMetadataFieldName)) { - auto parseStatus = rpc::ReplSetMetadata::readFromMetadata(metadataObj); + // Sharding users of ReplSetMetadata do not use the wall clock time field. + auto parseStatus = + rpc::ReplSetMetadata::readFromMetadata(metadataObj, /*requireWallTime*/ false); if (!parseStatus.isOK()) { return parseStatus.getStatus(); } @@ -103,7 +105,7 @@ Status ShardingEgressMetadataHook::_advanceConfigOptimeFromShard(ShardId shardId // is safe to use. const auto& replMetadata = parseStatus.getValue(); auto opTime = replMetadata.getLastOpCommitted(); - grid->advanceConfigOpTime(opTime); + grid->advanceConfigOpTime(opTime.opTime); } } else { // Regular shards return the config opTime as part of ConfigServerMetadata. |