summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp1
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h1
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.cpp5
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change_test.cpp8
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp3
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp3
-rw-r--r--src/mongo/db/repl/member_data.cpp34
-rw-r--r--src/mongo/db/repl/member_data.h20
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp27
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp82
-rw-r--r--src/mongo/db/repl/optime.h12
-rw-r--r--src/mongo/db/repl/repl_set_commands.cpp19
-rw-r--r--src/mongo/db/repl/repl_set_heartbeat_response.cpp46
-rw-r--r--src/mongo/db/repl/repl_set_heartbeat_response.h17
-rw-r--r--src/mongo/db/repl/repl_set_heartbeat_response_test.cpp236
-rw-r--r--src/mongo/db/repl/replication_coordinator.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp55
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h11
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp200
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp14
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp25
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp76
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp1122
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp20
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h33
-rw-r--r--src/mongo/db/repl/reporter.cpp2
-rw-r--r--src/mongo/db/repl/reporter_test.cpp4
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp95
-rw-r--r--src/mongo/db/repl/topology_coordinator.h9
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp103
-rw-r--r--src/mongo/db/repl/update_position_args.cpp36
-rw-r--r--src/mongo/db/repl/update_position_args.h8
34 files changed, 1448 insertions, 896 deletions
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
index 7d39509d971..801548b6686 100644
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
+++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
@@ -112,6 +112,7 @@ void AbstractOplogFetcherTest::setUp() {
launchExecutorThread();
lastFetched = {{123, 0}, 1};
+ lastFetchedWall = Date_t::min() + Seconds(lastFetched.getSecs());
}
executor::RemoteCommandRequest AbstractOplogFetcherTest::processNetworkResponse(
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h
index 9d1ec170ec8..2164f93cac6 100644
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h
+++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h
@@ -94,6 +94,7 @@ protected:
// The last OpTime fetched by the oplog fetcher.
OpTime lastFetched;
+ Date_t lastFetchedWall;
};
} // namespace repl
} // namespace mango
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp
index ec0ea421550..9c8b9eefe12 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/repl/scatter_gather_algorithm.h"
#include "mongo/db/repl/scatter_gather_runner.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/server_options.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/util/log.h"
#include "mongo/util/str.h"
@@ -195,7 +196,7 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque
BSONObj resBSON = response.data;
ReplSetHeartbeatResponse hbResp;
- Status hbStatus = hbResp.initialize(resBSON, 0);
+ Status hbStatus = hbResp.initialize(resBSON, 0, /*requireWallTime*/ false);
if (hbStatus.code() == ErrorCodes::InconsistentReplicaSetNames) {
std::string message = str::stream() << "Our set name did not match that of "
@@ -226,7 +227,7 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque
if (_rsConfig->hasReplicaSetId()) {
StatusWith<rpc::ReplSetMetadata> replMetadata =
- rpc::ReplSetMetadata::readFromMetadata(response.data);
+ rpc::ReplSetMetadata::readFromMetadata(response.data, /*requireWallTime*/ false);
if (replMetadata.isOK() && replMetadata.getValue().getReplicaSetId().isSet() &&
_rsConfig->getReplicaSetId() != replMetadata.getValue().getReplicaSetId()) {
std::string message = str::stream()
diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
index afd4bdf79b1..36807a63c2e 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
@@ -239,8 +239,9 @@ executor::RemoteCommandResponse makeHeartbeatResponse(const ReplSetConfig& rsCon
hbResp.setConfigVersion(configVersion);
// The smallest valid optime in PV1.
OpTime opTime(Timestamp(), 0);
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
+ Date_t wallTime = Date_t::min();
+ hbResp.setAppliedOpTimeAndWallTime({opTime, wallTime});
+ hbResp.setDurableOpTimeAndWallTime({opTime, wallTime});
auto bob = BSONObjBuilder(hbResp.toBSON());
bob.appendElements(extraFields);
return RemoteCommandResponse(bob.obj(), duration_cast<Milliseconds>(millis));
@@ -469,8 +470,9 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetIdMismatch) {
<< request.target.toString();
if (request.target == incompatibleHost) {
OpTime opTime{Timestamp{10, 10}, 10};
+ Date_t wallTime = Date_t::min();
rpc::ReplSetMetadata metadata(opTime.getTerm(),
- opTime,
+ {opTime, wallTime},
opTime,
rsConfig.getConfigVersion(),
unexpectedId,
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index c1075aa9dbb..cba04681a31 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -79,8 +79,7 @@ OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOp
void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& replMetadata,
rpc::OplogQueryMetadata oqMetadata) {
- OpTime newCommitPoint;
- newCommitPoint = oqMetadata.getLastOpCommitted();
+ OpTimeAndWallTime newCommitPoint = oqMetadata.getLastOpCommitted();
const bool fromSyncSource = true;
_replicationCoordinator->advanceCommitPoint(newCommitPoint, fromSyncSource);
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index ac82555ff60..2eb1d27e5ef 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -518,7 +518,8 @@ RemoteCommandResponse makeCursorResponse(CursorId cursorId,
bool isFirstBatch = true,
int rbid = 1) {
OpTime futureOpTime(Timestamp(1000, 1000), 1000);
- rpc::OplogQueryMetadata oqMetadata(futureOpTime, futureOpTime, rbid, 0, 0);
+ Date_t futureWallTime = Date_t::min() + Seconds(futureOpTime.getSecs());
+ rpc::OplogQueryMetadata oqMetadata({futureOpTime, futureWallTime}, futureOpTime, rbid, 0, 0);
BSONObjBuilder bob;
{
diff --git a/src/mongo/db/repl/member_data.cpp b/src/mongo/db/repl/member_data.cpp
index 18cb910496e..197376cd15c 100644
--- a/src/mongo/db/repl/member_data.cpp
+++ b/src/mongo/db/repl/member_data.cpp
@@ -43,7 +43,7 @@ namespace repl {
MemberData::MemberData() : _health(-1), _authIssue(false), _configIndex(-1), _isSelf(false) {
_lastResponse.setState(MemberState::RS_UNKNOWN);
_lastResponse.setElectionTime(Timestamp());
- _lastResponse.setAppliedOpTime(OpTime());
+ _lastResponse.setAppliedOpTimeAndWallTime(OpTimeAndWallTime());
}
bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) {
@@ -65,7 +65,7 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse)
hbResponse.setElectionTime(_lastResponse.getElectionTime());
}
if (!hbResponse.hasAppliedOpTime()) {
- hbResponse.setAppliedOpTime(_lastResponse.getAppliedOpTime());
+ hbResponse.setAppliedOpTimeAndWallTime(_lastResponse.getAppliedOpTimeAndWallTime());
}
// Log if the state changes
if (_lastResponse.getState() != hbResponse.getState()) {
@@ -73,9 +73,13 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse)
<< hbResponse.getState().toString() << rsLog;
}
- bool opTimeAdvanced = advanceLastAppliedOpTime(hbResponse.getAppliedOpTime(), now);
- auto durableOpTime = hbResponse.hasDurableOpTime() ? hbResponse.getDurableOpTime() : OpTime();
- opTimeAdvanced = advanceLastDurableOpTime(durableOpTime, now) || opTimeAdvanced;
+ bool opTimeAdvanced =
+ advanceLastAppliedOpTimeAndWallTime(hbResponse.getAppliedOpTimeAndWallTime(), now);
+ auto durableOpTimeAndWallTime = hbResponse.hasDurableOpTime()
+ ? hbResponse.getDurableOpTimeAndWallTime()
+ : OpTimeAndWallTime();
+ opTimeAdvanced =
+ advanceLastDurableOpTimeAndWallTime(durableOpTimeAndWallTime, now) || opTimeAdvanced;
_lastResponse = std::move(hbResponse);
return opTimeAdvanced;
}
@@ -95,7 +99,7 @@ void MemberData::setDownValues(Date_t now, const std::string& heartbeatMessage)
_lastResponse = ReplSetHeartbeatResponse();
_lastResponse.setState(MemberState::RS_DOWN);
_lastResponse.setElectionTime(Timestamp());
- _lastResponse.setAppliedOpTime(OpTime());
+ _lastResponse.setAppliedOpTimeAndWallTime(OpTimeAndWallTime());
_lastResponse.setSyncingTo(HostAndPort());
// The _lastAppliedOpTime/_lastDurableOpTime fields don't get cleared merely by missing a
@@ -118,7 +122,7 @@ void MemberData::setAuthIssue(Date_t now) {
_lastResponse = ReplSetHeartbeatResponse();
_lastResponse.setState(MemberState::RS_UNKNOWN);
_lastResponse.setElectionTime(Timestamp());
- _lastResponse.setAppliedOpTime(OpTime());
+ _lastResponse.setAppliedOpTimeAndWallTime(OpTimeAndWallTime());
_lastResponse.setSyncingTo(HostAndPort());
}
@@ -129,10 +133,6 @@ void MemberData::setLastAppliedOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_
_lastAppliedWallTime = opTime.wallTime;
}
-void MemberData::setLastAppliedOpTime(OpTime opTime, Date_t now) {
- setLastAppliedOpTimeAndWallTime({opTime, Date_t::min()}, now);
-}
-
void MemberData::setLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now) {
_lastUpdate = now;
_lastUpdateStale = false;
@@ -150,10 +150,6 @@ void MemberData::setLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_
}
}
-void MemberData::setLastDurableOpTime(OpTime opTime, Date_t now) {
- setLastDurableOpTimeAndWallTime({opTime, Date_t::min()}, now);
-}
-
bool MemberData::advanceLastAppliedOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now) {
_lastUpdate = now;
_lastUpdateStale = false;
@@ -164,10 +160,6 @@ bool MemberData::advanceLastAppliedOpTimeAndWallTime(OpTimeAndWallTime opTime, D
return false;
}
-bool MemberData::advanceLastAppliedOpTime(OpTime opTime, Date_t now) {
- return advanceLastAppliedOpTimeAndWallTime({opTime, Date_t::min()}, now);
-}
-
bool MemberData::advanceLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now) {
_lastUpdate = now;
_lastUpdateStale = false;
@@ -179,9 +171,5 @@ bool MemberData::advanceLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, D
return false;
}
-bool MemberData::advanceLastDurableOpTime(OpTime opTime, Date_t now) {
- return advanceLastDurableOpTimeAndWallTime({opTime, Date_t::min()}, now);
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/member_data.h b/src/mongo/db/repl/member_data.h
index cae0e30b4c9..d8887317f83 100644
--- a/src/mongo/db/repl/member_data.h
+++ b/src/mongo/db/repl/member_data.h
@@ -172,37 +172,21 @@ public:
}
/**
- * Sets the last applied op time (not the heartbeat applied op time) and updates the
- * lastUpdate time.
- */
- void setLastAppliedOpTime(OpTime opTime, Date_t now);
-
- /**
* Performs setLastAppliedOpTime and also sets the wall clock time corresponding to the last
* applied opTime. Should only be used on the current node.
*/
void setLastAppliedOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now);
/**
- * Sets the last durable op time (not the heartbeat durable op time)
- */
- void setLastDurableOpTime(OpTime opTime, Date_t now);
-
- /**
* Performs setLastDurableOpTime and also sets the wall clock time corresponding to the last
* durable opTime. Should only be used on the current node.
*/
void setLastDurableOpTimeAndWallTime(OpTimeAndWallTime opTime, Date_t now);
-
/**
* Sets the last applied op time (not the heartbeat applied op time) iff the new optime is
* later than the current optime, and updates the lastUpdate time. Returns true if the
* optime was advanced.
- */
- bool advanceLastAppliedOpTime(OpTime opTime, Date_t now);
-
- /**
* Performs advanceLastAppliedOpTime and also sets the wall clock time corresponding to the last
* applied opTime. Should only be used on the current node.
*/
@@ -212,10 +196,6 @@ public:
* Sets the last durable op time (not the heartbeat applied op time) iff the new optime is
* later than the current optime, and updates the lastUpdate time. Returns true if the
* optime was advanced.
- */
- bool advanceLastDurableOpTime(OpTime opTime, Date_t now);
-
- /**
* Performs advanceLastDurableOpTime and also sets the wall clock time corresponding to the last
* durable opTime. Should only be used on the current node.
*/
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 84ebd79b4da..a432da62bb4 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -229,7 +229,19 @@ StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata(
queryResponse.otherFields.metadata.hasElement(rpc::kOplogQueryMetadataFieldName);
if (receivedOplogQueryMetadata) {
const auto& metadataObj = queryResponse.otherFields.metadata;
- auto metadataResult = rpc::OplogQueryMetadata::readFromMetadata(metadataObj);
+ // Wall clock times are required in OplogQueryMetadata when FCV is 4.2. Arbiters trivially
+ // have FCV equal to 4.2, so they are excluded from this check.
+ bool isArbiter = hasGlobalServiceContext() &&
+ repl::ReplicationCoordinator::get(getGlobalServiceContext()) &&
+ repl::ReplicationCoordinator::get(getGlobalServiceContext())->getMemberState() ==
+ MemberState::RS_ARBITER;
+ bool requireWallTime =
+ (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 &&
+ !isArbiter);
+ auto metadataResult =
+ rpc::OplogQueryMetadata::readFromMetadata(metadataObj, requireWallTime);
if (!metadataResult.isOK()) {
return metadataResult.getStatus();
}
@@ -469,7 +481,18 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
if (receivedReplMetadata) {
const auto& metadataObj = queryResponse.otherFields.metadata;
- auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj);
+ // Wall clock times are required in ReplSetMetadata when FCV is 4.2. Arbiters trivially
+ // have FCV equal to 4.2, so they are excluded from this check.
+ bool isArbiter = hasGlobalServiceContext() &&
+ repl::ReplicationCoordinator::get(getGlobalServiceContext()) &&
+ repl::ReplicationCoordinator::get(getGlobalServiceContext())->getMemberState() ==
+ MemberState::RS_ARBITER;
+ bool requireWallTime =
+ (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 &&
+ !isArbiter);
+ auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj, requireWallTime);
if (!metadataResult.isOK()) {
error() << "invalid replication metadata from sync source " << _getSource() << ": "
<< metadataResult.getStatus() << ": " << metadataObj;
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 532821b3cd8..233735db58e 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -94,6 +94,7 @@ protected:
OpTime remoteNewerOpTime;
OpTime staleOpTime;
+ Date_t staleWallTime;
int rbid;
std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState;
@@ -110,6 +111,7 @@ void OplogFetcherTest::setUp() {
remoteNewerOpTime = {{124, 1}, 2};
staleOpTime = {{1, 1}, 0};
+ staleWallTime = Date_t::min() + Seconds(staleOpTime.getSecs());
rbid = 2;
dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
@@ -130,7 +132,7 @@ BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime,
int primaryIndex,
int syncSourceIndex) {
rpc::OplogQueryMetadata oqMetadata(
- staleOpTime, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex);
+ {staleOpTime, staleWallTime}, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex);
BSONObjBuilder bob;
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
return bob.obj();
@@ -284,7 +286,8 @@ TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher
DEATH_TEST_F(OplogFetcherTest,
ValidMetadataInResponseWithoutOplogMetadataInvariants,
"Invariant failure oqMetadata") {
- rpc::ReplSetMetadata metadata(1, lastFetched, lastFetched, 1, OID::gen(), 2, 2);
+ rpc::ReplSetMetadata metadata(
+ 1, {lastFetched, lastFetchedWall}, lastFetched, 1, OID::gen(), 2, 2);
BSONObjBuilder bob;
ASSERT_OK(metadata.writeToMetadata(&bob));
auto metadataObj = bob.obj();
@@ -295,8 +298,9 @@ DEATH_TEST_F(OplogFetcherTest,
}
TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) {
- rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid, 2, 2);
+ rpc::ReplSetMetadata replMetadata(
+ 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -314,8 +318,10 @@ TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMe
}
TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) {
- rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid + 1, 2, 2);
+ rpc::ReplSetMetadata replMetadata(
+ 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(
+ {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -332,8 +338,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack)
}
TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) {
- rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2);
+ rpc::ReplSetMetadata replMetadata(
+ 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -350,8 +357,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind)
}
TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) {
- rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched, rbid, 2, 2);
+ rpc::ReplSetMetadata replMetadata(
+ 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -369,8 +377,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead
TEST_F(OplogFetcherTest,
MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) {
- rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2);
+ rpc::ReplSetMetadata replMetadata(
+ 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -390,8 +399,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM
// This tests the case where the sync source metadata is behind us but we get a document which
// is equal to us. Since that means the metadata is stale and can be ignored, we should accept
// this sync source.
- rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2);
+ rpc::ReplSetMetadata replMetadata(
+ 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -406,8 +416,9 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM
TEST_F(OplogFetcherTest,
MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) {
- rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched, rbid, 2, 2);
+ rpc::ReplSetMetadata replMetadata(
+ 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -422,7 +433,8 @@ TEST_F(OplogFetcherTest,
TEST_F(OplogFetcherTest,
MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) {
- rpc::ReplSetMetadata metadata(1, lastFetched, lastFetched, 1, OID::gen(), 2, 2);
+ rpc::ReplSetMetadata metadata(
+ 1, {lastFetched, lastFetchedWall}, lastFetched, 1, OID::gen(), 2, 2);
BSONObjBuilder bob;
ASSERT_OK(metadata.writeToMetadata(&bob));
auto metadataObj = bob.obj();
@@ -436,8 +448,9 @@ TEST_F(OplogFetcherTest,
}
TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
- rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid, 2, 2);
+ rpc::ReplSetMetadata replMetadata(
+ 1, {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -757,9 +770,14 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetche
TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) {
rpc::ReplSetMetadata replMetadata(
- lastFetched.getTerm(), OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ lastFetched.getTerm(), {OpTime(), Date_t::min()}, OpTime(), 1, OID::gen(), -1, -1);
+ OpTime committedOpTime = {{Seconds(10000), 0}, 1};
rpc::OplogQueryMetadata oqMetadata(
- {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, rbid, 2, 2);
+ {committedOpTime, Date_t::min() + Seconds(committedOpTime.getSecs())},
+ {{Seconds(20000), 0}, 1},
+ rbid,
+ 2,
+ 2);
testSyncSourceChecking(&replMetadata, &oqMetadata);
@@ -771,15 +789,21 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc
TEST_F(OplogFetcherTest,
FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
- rpc::ReplSetMetadata replMetadata(lastFetched.getTerm(),
- {{Seconds(10000), 0}, 1},
- {{Seconds(20000), 0}, 1},
- 1,
- OID::gen(),
- 2,
- 2);
+ OpTime committedOpTime = {{Seconds(10000), 0}, 1};
+ rpc::ReplSetMetadata replMetadata(
+ lastFetched.getTerm(),
+ {committedOpTime, Date_t::min() + Seconds(committedOpTime.getSecs())},
+ {{Seconds(20000), 0}, 1},
+ 1,
+ OID::gen(),
+ 2,
+ 2);
rpc::OplogQueryMetadata oqMetadata(
- {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, rbid, 2, -1);
+ {committedOpTime, Date_t::min() + Seconds(committedOpTime.getSecs())},
+ {{Seconds(20000), 0}, 1},
+ rbid,
+ 2,
+ -1);
testSyncSourceChecking(&replMetadata, &oqMetadata);
diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h
index c1b377f2845..8edb4ccde96 100644
--- a/src/mongo/db/repl/optime.h
+++ b/src/mongo/db/repl/optime.h
@@ -162,7 +162,17 @@ private:
struct OpTimeAndWallTime {
OpTime opTime;
- Date_t wallTime;
+ Date_t wallTime = Date_t::min();
+ inline bool operator==(const OpTimeAndWallTime& rhs) const {
+ return opTime == rhs.opTime && wallTime == rhs.wallTime;
+ }
+ inline bool operator<(const OpTimeAndWallTime& rhs) const {
+ // Wall clock time ordering should not matter for calculations of the commit point.
+ return opTime < rhs.opTime;
+ }
+ std::string toString() const {
+ return opTime.toString() + ", " + wallTime.toString();
+ }
};
std::ostream& operator<<(std::ostream& out, const OpTimeAndWallTime& opTime);
} // namespace repl
diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp
index e347a80f7da..6fdb826d85b 100644
--- a/src/mongo/db/repl/repl_set_commands.cpp
+++ b/src/mongo/db/repl/repl_set_commands.cpp
@@ -578,7 +578,16 @@ public:
if (cmdObj.hasField("handshake"))
return true;
- auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(cmdObj);
+ // Wall clock times are required in ReplSetMetadata when FCV is 4.2. Arbiters trivially
+ // have FCV equal to 4.2, so they are excluded from this check.
+ bool isArbiter = replCoord->getMemberState() == MemberState::RS_ARBITER;
+ bool requireWallTime =
+ (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 &&
+ !isArbiter);
+
+ auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(cmdObj, requireWallTime);
if (metadataResult.isOK()) {
// New style update position command has metadata, which may inform the
// upstream of a higher term.
@@ -592,7 +601,13 @@ public:
UpdatePositionArgs args;
- status = args.initialize(cmdObj);
+ // re-check requireWallTime
+ requireWallTime =
+ (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 &&
+ !isArbiter);
+ status = args.initialize(cmdObj, requireWallTime);
if (status.isOK()) {
status = replCoord->processReplSetUpdatePosition(args, &configVersion);
diff --git a/src/mongo/db/repl/repl_set_heartbeat_response.cpp b/src/mongo/db/repl/repl_set_heartbeat_response.cpp
index a312970aad3..2c1f5f88734 100644
--- a/src/mongo/db/repl/repl_set_heartbeat_response.cpp
+++ b/src/mongo/db/repl/repl_set_heartbeat_response.cpp
@@ -39,6 +39,7 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/bson_extract_optime.h"
+#include "mongo/db/server_options.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -54,7 +55,9 @@ const std::string kElectionTimeFieldName = "electionTime";
const std::string kMemberStateFieldName = "state";
const std::string kOkFieldName = "ok";
const std::string kDurableOpTimeFieldName = "durableOpTime";
+const std::string kDurableWallTimeFieldName = "durableWallTime";
const std::string kAppliedOpTimeFieldName = "opTime";
+const std::string kAppliedWallTimeFieldName = "wallTime";
const std::string kPrimaryIdFieldName = "primaryId";
const std::string kReplSetFieldName = "set";
const std::string kSyncSourceFieldName = "syncingTo";
@@ -92,9 +95,11 @@ void ReplSetHeartbeatResponse::addToBSON(BSONObjBuilder* builder) const {
}
if (_durableOpTimeSet) {
_durableOpTime.append(builder, kDurableOpTimeFieldName);
+ builder->appendDate(kDurableWallTimeFieldName, _durableWallTime);
}
if (_appliedOpTimeSet) {
_appliedOpTime.append(builder, kAppliedOpTimeFieldName);
+ builder->appendDate(kAppliedWallTimeFieldName, _appliedWallTime);
}
}
@@ -104,7 +109,9 @@ BSONObj ReplSetHeartbeatResponse::toBSON() const {
return builder.obj();
}
-Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc, long long term) {
+Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc,
+ long long term,
+ bool requireWallTime) {
auto status = getStatusFromCommandResult(doc);
if (!status.isOK()) {
return status;
@@ -146,13 +153,40 @@ Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc, long long term)
if (!status.isOK()) {
return status;
}
+
+ BSONElement durableWallTimeElement;
+ _durableWallTime = Date_t::min();
+ status = bsonExtractTypedField(
+ doc, kDurableWallTimeFieldName, BSONType::Date, &durableWallTimeElement);
+ if (!status.isOK() && (status != ErrorCodes::NoSuchKey || requireWallTime)) {
+ // We ignore NoSuchKey errors if the FeatureCompatibilityVersion is less than 4.2, since
+ // older version nodes may not report wall clock times.
+ return status;
+ }
+ if (status.isOK()) {
+ _durableWallTime = durableWallTimeElement.Date();
+ }
_durableOpTimeSet = true;
+
// In V1, heartbeats OpTime is type Object and we construct an OpTime out of its nested fields.
status = bsonExtractOpTimeField(doc, kAppliedOpTimeFieldName, &_appliedOpTime);
if (!status.isOK()) {
return status;
}
+
+ BSONElement appliedWallTimeElement;
+ _appliedWallTime = Date_t::min();
+ status = bsonExtractTypedField(
+ doc, kAppliedWallTimeFieldName, BSONType::Date, &appliedWallTimeElement);
+ if (!status.isOK() && (status != ErrorCodes::NoSuchKey || requireWallTime)) {
+ // We ignore NoSuchKey errors if the FeatureCompatibilityVersion is less than 4.2, since
+ // older version nodes may not report wall clock times.
+ return status;
+ }
+ if (status.isOK()) {
+ _appliedWallTime = appliedWallTimeElement.Date();
+ }
_appliedOpTimeSet = true;
const BSONElement memberStateElement = doc[kMemberStateFieldName];
@@ -250,10 +284,20 @@ OpTime ReplSetHeartbeatResponse::getAppliedOpTime() const {
return _appliedOpTime;
}
+OpTimeAndWallTime ReplSetHeartbeatResponse::getAppliedOpTimeAndWallTime() const {
+ invariant(_appliedOpTimeSet);
+ return {_appliedOpTime, _appliedWallTime};
+}
+
OpTime ReplSetHeartbeatResponse::getDurableOpTime() const {
invariant(_durableOpTimeSet);
return _durableOpTime;
}
+OpTimeAndWallTime ReplSetHeartbeatResponse::getDurableOpTimeAndWallTime() const {
+ invariant(_durableOpTimeSet);
+ return {_durableOpTime, _durableWallTime};
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_set_heartbeat_response.h b/src/mongo/db/repl/repl_set_heartbeat_response.h
index 2c5fb91538f..3b883d4f017 100644
--- a/src/mongo/db/repl/repl_set_heartbeat_response.h
+++ b/src/mongo/db/repl/repl_set_heartbeat_response.h
@@ -52,8 +52,9 @@ public:
/**
* Initializes this ReplSetHeartbeatResponse from the contents of "doc".
* "term" is only used to complete a V0 OpTime (which is really a Timestamp).
+ * Only processes wall clock time elements if FCV is 4.2 (i.e., requireWallTime is true).
*/
- Status initialize(const BSONObj& doc, long long term);
+ Status initialize(const BSONObj& doc, long long term, bool requireWallTime);
/**
* Appends all non-default values to "builder".
@@ -104,10 +105,12 @@ public:
return _appliedOpTimeSet;
}
OpTime getAppliedOpTime() const;
+ OpTimeAndWallTime getAppliedOpTimeAndWallTime() const;
bool hasDurableOpTime() const {
return _durableOpTimeSet;
}
OpTime getDurableOpTime() const;
+ OpTimeAndWallTime getDurableOpTimeAndWallTime() const;
/**
* Sets _setName to "name".
@@ -158,13 +161,15 @@ public:
_primaryIdSet = true;
_primaryId = primaryId;
}
- void setAppliedOpTime(OpTime time) {
+ void setAppliedOpTimeAndWallTime(OpTimeAndWallTime time) {
_appliedOpTimeSet = true;
- _appliedOpTime = time;
+ _appliedOpTime = time.opTime;
+ _appliedWallTime = time.wallTime;
}
- void setDurableOpTime(OpTime time) {
+ void setDurableOpTimeAndWallTime(OpTimeAndWallTime time) {
_durableOpTimeSet = true;
- _durableOpTime = time;
+ _durableOpTime = time.opTime;
+ _durableWallTime = time.wallTime;
}
void setTerm(long long term) {
_term = term;
@@ -176,9 +181,11 @@ private:
bool _appliedOpTimeSet = false;
OpTime _appliedOpTime;
+ Date_t _appliedWallTime;
bool _durableOpTimeSet = false;
OpTime _durableOpTime;
+ Date_t _durableWallTime;
bool _stateSet = false;
MemberState _state;
diff --git a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp
index 5c5ec8df7e3..72c7d31190a 100644
--- a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp
+++ b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp
@@ -48,6 +48,10 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) {
int fieldsSet = 1;
ReplSetHeartbeatResponse hbResponse;
ReplSetHeartbeatResponse hbResponseObjRoundTripChecker;
+ OpTime durableOpTime = OpTime(Timestamp(10), 0);
+ Date_t durableWallTime = Date_t::min() + Seconds(durableOpTime.getSecs());
+ OpTime appliedOpTime = OpTime(Timestamp(50), 0);
+ Date_t appliedWallTime = Date_t::min() + Seconds(appliedOpTime.getSecs());
ASSERT_EQUALS(false, hbResponse.hasState());
ASSERT_EQUALS(false, hbResponse.hasElectionTime());
ASSERT_EQUALS(false, hbResponse.hasDurableOpTime());
@@ -73,11 +77,11 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) {
hbResponse.setElectionTime(Timestamp(10, 0));
++fieldsSet;
// set durableOpTime
- hbResponse.setDurableOpTime(OpTime(Timestamp(10), 0));
- ++fieldsSet;
+ hbResponse.setDurableOpTimeAndWallTime({durableOpTime, durableWallTime});
+ fieldsSet += 2; // OpTime and WallTime are separate fields
// set appliedOpTime
- hbResponse.setAppliedOpTime(OpTime(Timestamp(50), 0));
- ++fieldsSet;
+ hbResponse.setAppliedOpTimeAndWallTime({appliedOpTime, appliedWallTime});
+ fieldsSet += 2; // OpTime and WallTime are separate fields
// set config
ReplSetConfig config;
hbResponse.setConfig(config);
@@ -99,8 +103,10 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) {
ASSERT_EQUALS(HostAndPort("syncTarget"), hbResponse.getSyncingTo());
ASSERT_EQUALS(1, hbResponse.getConfigVersion());
ASSERT_EQUALS(Timestamp(10, 0), hbResponse.getElectionTime());
- ASSERT_EQUALS(OpTime(Timestamp(0, 10), 0), hbResponse.getDurableOpTime());
- ASSERT_EQUALS(OpTime(Timestamp(0, 50), 0), hbResponse.getAppliedOpTime());
+ ASSERT_EQUALS(durableOpTime, hbResponse.getDurableOpTime());
+ ASSERT_EQUALS(durableWallTime, hbResponse.getDurableOpTimeAndWallTime().wallTime);
+ ASSERT_EQUALS(appliedOpTime, hbResponse.getAppliedOpTime());
+ ASSERT_EQUALS(appliedWallTime, hbResponse.getAppliedOpTimeAndWallTime().wallTime);
ASSERT_EQUALS(config.toBSON().toString(), hbResponse.getConfig().toBSON().toString());
hbResponseObj = hbResponse.toBSON();
@@ -114,7 +120,8 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) {
ASSERT_EQUALS(2, hbResponseObj["state"].numberLong());
ASSERT_EQUALS("syncTarget:27017", hbResponseObj["syncingTo"].String());
- initializeResult = hbResponseObjRoundTripChecker.initialize(hbResponseObj, 0);
+ initializeResult =
+ hbResponseObjRoundTripChecker.initialize(hbResponseObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(Status::OK(), initializeResult);
ASSERT_EQUALS(hbResponseObj.toString(), hbResponseObjRoundTripChecker.toBSON().toString());
}
@@ -123,7 +130,7 @@ TEST(ReplSetHeartbeatResponse, InitializeWrongElectionTimeType) {
ReplSetHeartbeatResponse hbResponse;
BSONObj initializerObj = BSON("ok" << 1.0 << "electionTime"
<< "hello");
- Status result = hbResponse.initialize(initializerObj, 0);
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
ASSERT_EQUALS(
"Expected \"electionTime\" field in response to replSetHeartbeat command to "
@@ -135,44 +142,74 @@ TEST(ReplSetHeartbeatResponse, InitializeWrongDurableOpTimeType) {
ReplSetHeartbeatResponse hbResponse;
BSONObj initializerObj = BSON("ok" << 1.0 << "durableOpTime"
<< "hello");
- Status result = hbResponse.initialize(initializerObj, 0);
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
ASSERT_EQUALS("\"durableOpTime\" had the wrong type. Expected object, found string",
result.reason());
BSONObj initializerObj2 = BSON("ok" << 1.0 << "durableOpTime" << OpTime().getTimestamp());
- Status result2 = hbResponse.initialize(initializerObj2, 0);
+ Status result2 = hbResponse.initialize(initializerObj2, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result2);
ASSERT_EQUALS("\"durableOpTime\" had the wrong type. Expected object, found timestamp",
result2.reason());
}
-TEST(ReplSetHeartbeatResponse, InitializeWrongAppliedOpTimeType) {
+TEST(ReplSetHeartbeatResponse, InitializeNoDurableWallTime) {
ReplSetHeartbeatResponse hbResponse;
BSONObj initializerObj =
BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << "hello");
- Status result = hbResponse.initialize(initializerObj, 0);
+ << OpTime(Timestamp(100, 0), 0).toBSON());
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, result);
+ ASSERT_EQUALS("Missing expected field \"durableWallTime\"", result.reason());
+}
+
+TEST(ReplSetHeartbeatResponse, InitializeWrongAppliedOpTimeType) {
+ ReplSetHeartbeatResponse hbResponse;
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << "hello");
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
ASSERT_EQUALS("\"opTime\" had the wrong type. Expected object, found string", result.reason());
- initializerObj =
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime().getTimestamp());
- result = hbResponse.initialize(initializerObj, 0);
+ initializerObj = BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime().getTimestamp());
+ result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
ASSERT_EQUALS("\"opTime\" had the wrong type. Expected object, found timestamp",
result.reason());
}
+TEST(ReplSetHeartbeatResponse, InitializeNoAppliedWallTime) {
+ ReplSetHeartbeatResponse hbResponse;
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON());
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, result);
+ ASSERT_EQUALS("Missing expected field \"wallTime\"", result.reason());
+}
+
TEST(ReplSetHeartbeatResponse, InitializeMemberStateWrongType) {
ReplSetHeartbeatResponse hbResponse;
- BSONObj initializerObj =
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime(Timestamp(100, 0), 0).toBSON()
- << "state"
- << "hello");
- Status result = hbResponse.initialize(initializerObj, 0);
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)
+ << "state"
+ << "hello");
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
ASSERT_EQUALS(
"Expected \"state\" field in response to replSetHeartbeat command to "
@@ -182,12 +219,16 @@ TEST(ReplSetHeartbeatResponse, InitializeMemberStateWrongType) {
TEST(ReplSetHeartbeatResponse, InitializeMemberStateTooLow) {
ReplSetHeartbeatResponse hbResponse;
- BSONObj initializerObj =
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime(Timestamp(100, 0), 0).toBSON()
- << "state"
- << -1);
- Status result = hbResponse.initialize(initializerObj, 0);
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)
+ << "state"
+ << -1);
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::BadValue, result);
ASSERT_EQUALS(
"Value for \"state\" in response to replSetHeartbeat is out of range; "
@@ -197,12 +238,16 @@ TEST(ReplSetHeartbeatResponse, InitializeMemberStateTooLow) {
TEST(ReplSetHeartbeatResponse, InitializeMemberStateTooHigh) {
ReplSetHeartbeatResponse hbResponse;
- BSONObj initializerObj =
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime(Timestamp(100, 0), 0).toBSON()
- << "state"
- << 11);
- Status result = hbResponse.initialize(initializerObj, 0);
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)
+ << "state"
+ << 11);
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::BadValue, result);
ASSERT_EQUALS(
"Value for \"state\" in response to replSetHeartbeat is out of range; "
@@ -212,12 +257,16 @@ TEST(ReplSetHeartbeatResponse, InitializeMemberStateTooHigh) {
TEST(ReplSetHeartbeatResponse, InitializeVersionWrongType) {
ReplSetHeartbeatResponse hbResponse;
- BSONObj initializerObj =
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime(Timestamp(100, 0), 0).toBSON()
- << "v"
- << "hello");
- Status result = hbResponse.initialize(initializerObj, 0);
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)
+ << "v"
+ << "hello");
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
ASSERT_EQUALS(
"Expected \"v\" field in response to replSetHeartbeat to "
@@ -227,14 +276,18 @@ TEST(ReplSetHeartbeatResponse, InitializeVersionWrongType) {
TEST(ReplSetHeartbeatResponse, InitializeReplSetNameWrongType) {
ReplSetHeartbeatResponse hbResponse;
- BSONObj initializerObj =
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime(Timestamp(100, 0), 0).toBSON()
- << "v"
- << 2 // needs a version to get this far in initialize()
- << "set"
- << 4);
- Status result = hbResponse.initialize(initializerObj, 0);
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)
+ << "v"
+ << 2 // needs a version to get this far in initialize()
+ << "set"
+ << 4);
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
ASSERT_EQUALS(
"Expected \"set\" field in response to replSetHeartbeat to "
@@ -244,14 +297,18 @@ TEST(ReplSetHeartbeatResponse, InitializeReplSetNameWrongType) {
TEST(ReplSetHeartbeatResponse, InitializeSyncingToWrongType) {
ReplSetHeartbeatResponse hbResponse;
- BSONObj initializerObj =
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime(Timestamp(100, 0), 0).toBSON()
- << "v"
- << 2 // needs a version to get this far in initialize()
- << "syncingTo"
- << 4);
- Status result = hbResponse.initialize(initializerObj, 0);
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)
+ << "v"
+ << 2 // needs a version to get this far in initialize()
+ << "syncingTo"
+ << 4);
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
ASSERT_EQUALS(
"Expected \"syncingTo\" field in response to replSetHeartbeat to "
@@ -261,14 +318,18 @@ TEST(ReplSetHeartbeatResponse, InitializeSyncingToWrongType) {
TEST(ReplSetHeartbeatResponse, InitializeConfigWrongType) {
ReplSetHeartbeatResponse hbResponse;
- BSONObj initializerObj =
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime(Timestamp(100, 0), 0).toBSON()
- << "v"
- << 2 // needs a version to get this far in initialize()
- << "config"
- << 4);
- Status result = hbResponse.initialize(initializerObj, 0);
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)
+ << "v"
+ << 2 // needs a version to get this far in initialize()
+ << "config"
+ << 4);
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
ASSERT_EQUALS(
"Expected \"config\" in response to replSetHeartbeat to "
@@ -278,14 +339,18 @@ TEST(ReplSetHeartbeatResponse, InitializeConfigWrongType) {
TEST(ReplSetHeartbeatResponse, InitializeBadConfig) {
ReplSetHeartbeatResponse hbResponse;
- BSONObj initializerObj =
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime(Timestamp(100, 0), 0).toBSON()
- << "v"
- << 2 // needs a version to get this far in initialize()
- << "config"
- << BSON("illegalFieldName" << 2));
- Status result = hbResponse.initialize(initializerObj, 0);
+ BSONObj initializerObj = BSON(
+ "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)
+ << "v"
+ << 2 // needs a version to get this far in initialize()
+ << "config"
+ << BSON("illegalFieldName" << 2));
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::BadValue, result);
ASSERT_EQUALS("Unexpected field illegalFieldName in replica set configuration",
result.reason());
@@ -298,16 +363,22 @@ TEST(ReplSetHeartbeatResponse, NoConfigStillInitializing) {
BSONObj initializerObj =
BSON("ok" << 0.0 << "code" << ErrorCodes::NotYetInitialized << "errmsg"
<< "Received heartbeat while still initializing replication system.");
- Status result = hbResp.initialize(initializerObj, 0);
+ Status result = hbResp.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::NotYetInitialized, result.code());
}
TEST(ReplSetHeartbeatResponse, InvalidResponseOpTimeMissesConfigVersion) {
ReplSetHeartbeatResponse hbResp;
- Status result = hbResp.initialize(
- BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "opTime"
- << OpTime(Timestamp(100, 0), 0).toBSON()),
- 0);
+ Status result = hbResp.initialize(BSON("ok" << 1.0 << "durableOpTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "durableWallTime"
+ << Date_t::min() + Seconds(100)
+ << "opTime"
+ << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)),
+ 0,
+ /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.code());
ASSERT_TRUE(stringContains(result.reason(), "\"v\""))
<< result.reason() << " doesn't contain 'v' field required error msg";
@@ -318,7 +389,7 @@ TEST(ReplSetHeartbeatResponse, MismatchedReplicaSetNames) {
BSONObj initializerObj =
BSON("ok" << 0.0 << "code" << ErrorCodes::InconsistentReplicaSetNames << "errmsg"
<< "replica set name doesn't match.");
- Status result = hbResponse.initialize(initializerObj, 0);
+ Status result = hbResponse.initialize(initializerObj, 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::InconsistentReplicaSetNames, result.code());
}
@@ -326,7 +397,9 @@ TEST(ReplSetHeartbeatResponse, AuthFailure) {
ReplSetHeartbeatResponse hbResp;
std::string errMsg = "Unauthorized";
Status result = hbResp.initialize(
- BSON("ok" << 0.0 << "errmsg" << errMsg << "code" << ErrorCodes::Unauthorized), 0);
+ BSON("ok" << 0.0 << "errmsg" << errMsg << "code" << ErrorCodes::Unauthorized),
+ 0,
+ /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::Unauthorized, result.code());
ASSERT_EQUALS(errMsg, result.reason());
}
@@ -334,7 +407,8 @@ TEST(ReplSetHeartbeatResponse, AuthFailure) {
TEST(ReplSetHeartbeatResponse, ServerError) {
ReplSetHeartbeatResponse hbResp;
std::string errMsg = "Random Error";
- Status result = hbResp.initialize(BSON("ok" << 0.0 << "errmsg" << errMsg), 0);
+ Status result =
+ hbResp.initialize(BSON("ok" << 0.0 << "errmsg" << errMsg), 0, /*requireWallTime*/ true);
ASSERT_EQUALS(ErrorCodes::UnknownError, result.code());
ASSERT_EQUALS(errMsg, result.reason());
}
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 31a54cbb470..47be83249f7 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -620,7 +620,8 @@ public:
* the same branch of history as 'committedOptime', so we update our commit point to
* min(committedOptime, lastApplied).
*/
- virtual void advanceCommitPoint(const OpTime& committedOptime, bool fromSyncSource) = 0;
+ virtual void advanceCommitPoint(const OpTimeAndWallTime& committedOpTimeAndWallTime,
+ bool fromSyncSource) = 0;
/**
* Elections under protocol version 1 are triggered by a timer.
@@ -755,6 +756,7 @@ public:
* operation in their oplogs. This implies such ops will never be rolled back.
*/
virtual OpTime getLastCommittedOpTime() const = 0;
+ virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const = 0;
/**
* Returns a list of objects that contain this node's knowledge of the state of the members of
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f5bede64b57..6670e7c6ec6 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1045,7 +1045,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
// Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged
// our election in onTransitionToPrimary(), above.
- _updateLastCommittedOpTime(lk);
+ _updateLastCommittedOpTimeAndWallTime(lk);
// Update _canAcceptNonLocalWrites
_updateMemberStateFromTopologyCoordinator(lk, opCtx);
@@ -1187,7 +1187,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTime(
opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed);
// If we are using applied times to calculate the commit level, update it now.
if (!_rsConfig.getWriteConcernMajorityShouldJournal()) {
- _updateLastCommittedOpTime(lk);
+ _updateLastCommittedOpTimeAndWallTime(lk);
}
// Signal anyone waiting on optime changes.
@@ -1237,7 +1237,7 @@ void ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTime(
opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed);
// If we are using durable times to calculate the commit level, update it now.
if (_rsConfig.getWriteConcernMajorityShouldJournal()) {
- _updateLastCommittedOpTime(lk);
+ _updateLastCommittedOpTimeAndWallTime(lk);
}
}
@@ -1523,10 +1523,15 @@ Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(getReplicationMode() == modeReplSet);
- const UpdatePositionArgs::UpdateInfo update(OpTime(), opTime, cfgVer, memberId);
+ const UpdatePositionArgs::UpdateInfo update(OpTime(),
+ Date_t::min(),
+ opTime,
+ Date_t::min() + Seconds(opTime.getSecs()),
+ cfgVer,
+ memberId);
long long configVersion;
const auto status = _setLastOptime(lock, update, &configVersion);
- _updateLastCommittedOpTime(lock);
+ _updateLastCommittedOpTimeAndWallTime(lock);
return status;
}
@@ -1536,10 +1541,15 @@ Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(getReplicationMode() == modeReplSet);
- const UpdatePositionArgs::UpdateInfo update(opTime, OpTime(), cfgVer, memberId);
+ const UpdatePositionArgs::UpdateInfo update(opTime,
+ Date_t::min() + Seconds(opTime.getSecs()),
+ OpTime(),
+ Date_t::min(),
+ cfgVer,
+ memberId);
long long configVersion;
const auto status = _setLastOptime(lock, update, &configVersion);
- _updateLastCommittedOpTime(lock);
+ _updateLastCommittedOpTimeAndWallTime(lock);
return status;
}
@@ -1552,7 +1562,7 @@ Status ReplicationCoordinatorImpl::_setLastOptime(WithLock lk,
const bool advancedOpTime = result.getValue();
// Only update committed optime if the remote optimes increased.
if (advancedOpTime) {
- _updateLastCommittedOpTime(lk);
+ _updateLastCommittedOpTimeAndWallTime(lk);
}
_cancelAndRescheduleLivenessUpdate_inlock(args.memberId);
@@ -3092,7 +3102,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
// nodes in the set will contact us.
_startHeartbeats_inlock();
}
- _updateLastCommittedOpTime(lk);
+ _updateLastCommittedOpTimeAndWallTime(lk);
return action;
}
@@ -3320,8 +3330,8 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(
currentSource, replMetadata, oqMetadata, _replExecutor->now());
}
-void ReplicationCoordinatorImpl::_updateLastCommittedOpTime(WithLock lk) {
- if (_topCoord->updateLastCommittedOpTime()) {
+void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock lk) {
+ if (_topCoord->updateLastCommittedOpTimeAndWallTime()) {
_setStableTimestampForStorage(lk);
}
// Wake up any threads waiting for replication that now have their replication
@@ -3490,26 +3500,26 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) {
}
}
-void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTime,
- bool fromSyncSource) {
+void ReplicationCoordinatorImpl::advanceCommitPoint(
+ const OpTimeAndWallTime& committedOpTimeAndWallTime, bool fromSyncSource) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _advanceCommitPoint(lk, committedOpTime, fromSyncSource);
+ _advanceCommitPoint(lk, committedOpTimeAndWallTime, fromSyncSource);
}
-void ReplicationCoordinatorImpl::_advanceCommitPoint(WithLock lk,
- const OpTime& committedOpTime,
- bool fromSyncSource) {
- if (_topCoord->advanceLastCommittedOpTime(committedOpTime, fromSyncSource)) {
+void ReplicationCoordinatorImpl::_advanceCommitPoint(
+ WithLock lk, const OpTimeAndWallTime& committedOpTimeAndWallTime, bool fromSyncSource) {
+ if (_topCoord->advanceLastCommittedOpTimeAndWallTime(committedOpTimeAndWallTime,
+ fromSyncSource)) {
if (_getMemberState_inlock().arbiter()) {
// Arbiters do not store replicated data, so we consider their data trivially
// consistent.
_setMyLastAppliedOpTimeAndWallTime(
- lk, {committedOpTime, Date_t::min()}, false, DataConsistency::Consistent);
+ lk, committedOpTimeAndWallTime, false, DataConsistency::Consistent);
}
_setStableTimestampForStorage(lk);
// Even if we have no new snapshot, we need to notify waiters that the commit point moved.
- _externalState->notifyOplogMetadataWaiters(committedOpTime);
+ _externalState->notifyOplogMetadataWaiters(committedOpTimeAndWallTime.opTime);
}
}
@@ -3518,6 +3528,11 @@ OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const {
return _topCoord->getLastCommittedOpTime();
}
+OpTimeAndWallTime ReplicationCoordinatorImpl::getLastCommittedOpTimeAndWallTime() const {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _topCoord->getLastCommittedOpTimeAndWallTime();
+}
+
Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
OperationContext* opCtx,
const ReplSetRequestVotesArgs& args,
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index e5641c38be9..76a6bf693be 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -219,7 +219,8 @@ public:
virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override;
- virtual void advanceCommitPoint(const OpTime& committedOpTime, bool fromSyncSource) override;
+ virtual void advanceCommitPoint(const OpTimeAndWallTime& committedOpTimeAndWallTime,
+ bool fromSyncSource) override;
virtual void cancelAndRescheduleElectionTimeout() override;
@@ -270,6 +271,7 @@ public:
boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
virtual OpTime getLastCommittedOpTime() const override;
+ virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override;
virtual Status processReplSetRequestVotes(OperationContext* opCtx,
const ReplSetRequestVotesArgs& args,
@@ -1068,8 +1070,11 @@ private:
* our lastApplied, unless 'fromSyncSource'=true, which guarantees we are on the same branch of
* history as 'committedOpTime', so we update our commit point to min(committedOpTime,
* lastApplied).
+ * Also updates corresponding wall clock time.
*/
- void _advanceCommitPoint(WithLock lk, const OpTime& committedOpTime, bool fromSyncSource);
+ void _advanceCommitPoint(WithLock lk,
+ const OpTimeAndWallTime& committedOpTimeAndWallTime,
+ bool fromSyncSource);
/**
* Scan the memberData and determine the highest last applied or last
@@ -1080,7 +1085,7 @@ private:
* Whether the last applied or last durable op time is used depends on whether
* the config getWriteConcernMajorityShouldJournal is set.
*/
- void _updateLastCommittedOpTime(WithLock lk);
+ void _updateLastCommittedOpTimeAndWallTime(WithLock lk);
/**
* Callback that attempts to set the current term in topology coordinator and
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index c98888fd2f4..56de352ac68 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -136,8 +136,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
ASSERT(getReplCoord()->getMemberState().secondary())
<< getReplCoord()->getMemberState().toString();
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10));
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
@@ -200,8 +200,8 @@ TEST_F(ReplCoordTest, StartElectionDoesNotStartAnElectionWhenNodeIsRecovering) {
ASSERT(getReplCoord()->getMemberState().recovering())
<< getReplCoord()->getMemberState().toString();
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10));
simulateEnoughHeartbeatsForAllNodesUp();
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
@@ -221,8 +221,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
<< 1),
HostAndPort("node1", 12345));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(10));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->waitForElectionFinish_forTest();
ASSERT(getReplCoord()->getMemberState().primary())
@@ -259,8 +259,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenAllNodesVoteYea) {
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
OperationContextNoop opCtx;
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
startCapturingLogMessages();
simulateSuccessfulV1Election();
@@ -300,8 +300,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenMaxSevenNodesVoteYea) {
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
OperationContextNoop opCtx;
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
startCapturingLogMessages();
simulateSuccessfulV1Election();
@@ -337,8 +337,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenInsufficientVotesAreReceivedDuringDryRun)
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
@@ -398,8 +398,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenDryRunResponseContainsANewerTerm) {
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
@@ -466,8 +466,8 @@ TEST_F(ReplCoordTest, NodeWillNotStandForElectionDuringHeartbeatReconfig) {
<< 1),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
getGlobalFailPointRegistry()
->getFailPoint("blockHeartbeatReconfigFinish")
@@ -495,8 +495,10 @@ TEST_F(ReplCoordTest, NodeWillNotStandForElectionDuringHeartbeatReconfig) {
hbResp2.setConfigVersion(3);
hbResp2.setSetName("mySet");
hbResp2.setState(MemberState::RS_SECONDARY);
- hbResp2.setAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- hbResp2.setDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ hbResp2.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
+ hbResp2.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
net->runUntil(net->now() + Seconds(10)); // run until we've sent a heartbeat request
const NetworkInterfaceMock::NetworkOperationIterator noi2 = net->getNextReadyRequest();
net->scheduleResponse(noi2, net->now(), makeResponseStatus(hbResp2.toBSON()));
@@ -528,8 +530,10 @@ TEST_F(ReplCoordTest, NodeWillNotStandForElectionDuringHeartbeatReconfig) {
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
- hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ hbResp.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
+ hbResp.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
BSONObjBuilder respObj;
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
} else {
@@ -590,8 +594,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenInsufficientVotesAreReceivedDuringRequest
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
@@ -640,8 +644,8 @@ TEST_F(ReplCoordTest, TransitionToRollbackFailsWhenElectionInProgress) {
ReplSetConfig config = assertMakeRSConfig(configObj);
OpTime time1(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
@@ -680,8 +684,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenVoteRequestResponseContainsANewerTerm) {
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
@@ -736,8 +740,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenTermChangesDuringDryRun) {
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
@@ -776,8 +780,8 @@ TEST_F(ReplCoordTest, ElectionFailsWhenTermChangesDuringActualElection) {
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(time1.getSecs()));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
@@ -950,8 +954,10 @@ private:
}
hbResp.setConfigVersion(config.getConfigVersion());
hbResp.setTerm(replCoord->getTerm());
- hbResp.setAppliedOpTime(otherNodesOpTime);
- hbResp.setDurableOpTime(otherNodesOpTime);
+ hbResp.setAppliedOpTimeAndWallTime(
+ {otherNodesOpTime, Date_t::min() + Seconds(otherNodesOpTime.getSecs())});
+ hbResp.setDurableOpTimeAndWallTime(
+ {otherNodesOpTime, Date_t::min() + Seconds(otherNodesOpTime.getSecs())});
auto response = makeResponseStatus(hbResp.toBSON());
net->scheduleResponse(noi, net->now(), response);
}
@@ -983,8 +989,10 @@ TEST_F(TakeoverTest, DoesntScheduleCatchupTakeoverIfCatchupDisabledButTakeoverDe
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(200, 1), 0);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
ASSERT_EQUALS(ErrorCodes::StaleTerm, replCoord->updateTerm(&opCtx, 1));
@@ -1026,8 +1034,10 @@ TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfNodeIsFresherThanCurrentPrimary)
// and some other node became the new primary. Once you hear about a primary election
// in term 1, your term will be increased.
replCoord->updateTerm_forTest(1, nullptr);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled yet.
@@ -1077,8 +1087,10 @@ TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfBothTakeoversAnOption) {
// and some other node became the new primary. Once you hear about a primary election
// in term 1, your term will be increased.
replCoord->updateTerm_forTest(1, nullptr);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
@@ -1131,8 +1143,10 @@ TEST_F(TakeoverTest, PrefersPriorityToCatchupTakeoverIfNodeHasHighestPriority) {
// and some other node became the new primary. Once you hear about a primary election
// in term 1, your term will be increased.
replCoord->updateTerm_forTest(1, nullptr);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
@@ -1181,8 +1195,10 @@ TEST_F(TakeoverTest, CatchupTakeoverNotScheduledTwice) {
// and some other node became the new primary. Once you hear about a primary election
// in term 1, your term will be increased.
replCoord->updateTerm_forTest(1, nullptr);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
@@ -1246,8 +1262,10 @@ TEST_F(TakeoverTest, CatchupAndPriorityTakeoverNotScheduledAtSameTime) {
// and some other node became the new primary. Once you hear about a primary election
// in term 1, your term will be increased.
replCoord->updateTerm_forTest(1, nullptr);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
@@ -1304,8 +1322,10 @@ TEST_F(TakeoverTest, CatchupTakeoverCallbackCanceledIfElectionTimeoutRuns) {
// and some other node became the new primary. Once you hear about a primary election
// in term 1, your term will be increased.
replCoord->updateTerm_forTest(1, nullptr);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
@@ -1377,8 +1397,10 @@ TEST_F(TakeoverTest, CatchupTakeoverCanceledIfTransitionToRollback) {
// and some other node became the new primary. Once you hear about a primary election
// in term 1, your term will be increased.
replCoord->updateTerm_forTest(1, nullptr);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
@@ -1440,8 +1462,10 @@ TEST_F(TakeoverTest, SuccessfulCatchupTakeover) {
OpTime currentOptime(Timestamp(100, 5000), 0);
OpTime behindOptime(Timestamp(100, 4000), 0);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
// Update the term so that the current term is ahead of the term of
// the last applied op time. This means that the primary is still in
@@ -1511,8 +1535,10 @@ TEST_F(TakeoverTest, CatchupTakeoverDryRunFailsPrimarySaysNo) {
OpTime currentOptime(Timestamp(100, 5000), 0);
OpTime behindOptime(Timestamp(100, 4000), 0);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
// Update the term so that the current term is ahead of the term of
// the last applied op time. This means that the primary is still in
@@ -1613,8 +1639,10 @@ TEST_F(TakeoverTest, PrimaryCatchesUpBeforeCatchupTakeover) {
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(200, 1), 0);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
// Update the term so that the current term is ahead of the term of
@@ -1678,8 +1706,10 @@ TEST_F(TakeoverTest, PrimaryCatchesUpBeforeHighPriorityNodeCatchupTakeover) {
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(200, 1), 0);
- replCoordSetMyLastAppliedOpTime(currentOptime);
- replCoordSetMyLastDurableOpTime(currentOptime);
+ replCoordSetMyLastAppliedOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(currentOptime,
+ Date_t::min() + Seconds(currentOptime.getSecs()));
OpTime behindOptime(Timestamp(100, 1), 0);
// Update the term so that the current term is ahead of the term of
@@ -1760,8 +1790,8 @@ TEST_F(TakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityThanCurrent
OperationContextNoop opCtx;
OpTime myOptime(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(myOptime);
- replCoordSetMyLastDurableOpTime(myOptime);
+ replCoordSetMyLastAppliedOpTime(myOptime, Date_t::min() + Seconds(myOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(myOptime, Date_t::min() + Seconds(myOptime.getSecs()));
// Make sure we're secondary and that no priority takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
@@ -1807,8 +1837,8 @@ TEST_F(TakeoverTest, SuccessfulPriorityTakeover) {
OperationContextNoop opCtx;
OpTime myOptime(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(myOptime);
- replCoordSetMyLastDurableOpTime(myOptime);
+ replCoordSetMyLastAppliedOpTime(myOptime, Date_t::min() + Seconds(myOptime.getSecs()));
+ replCoordSetMyLastDurableOpTime(myOptime, Date_t::min() + Seconds(myOptime.getSecs()));
// Make sure we're secondary and that no priority takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
@@ -1866,8 +1896,8 @@ TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) {
OpTime behindOpTime(Timestamp(100, 3999), 0);
OpTime closeEnoughOpTime(Timestamp(100, 4000), 0);
- replCoordSetMyLastAppliedOpTime(behindOpTime);
- replCoordSetMyLastDurableOpTime(behindOpTime);
+ replCoordSetMyLastAppliedOpTime(behindOpTime, Date_t::min() + Seconds(behindOpTime.getSecs()));
+ replCoordSetMyLastDurableOpTime(behindOpTime, Date_t::min() + Seconds(behindOpTime.getSecs()));
// Make sure we're secondary and that no priority takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
@@ -1906,8 +1936,10 @@ TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) {
assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// Now make us caught up enough to call for priority takeover to succeed.
- replCoordSetMyLastAppliedOpTime(closeEnoughOpTime);
- replCoordSetMyLastDurableOpTime(closeEnoughOpTime);
+ replCoordSetMyLastAppliedOpTime(closeEnoughOpTime,
+ Date_t::min() + Seconds(closeEnoughOpTime.getSecs()));
+ replCoordSetMyLastDurableOpTime(closeEnoughOpTime,
+ Date_t::min() + Seconds(closeEnoughOpTime.getSecs()));
LastVote lastVoteExpected = LastVote(replCoord->getTerm() + 1, 0);
performSuccessfulTakeover(priorityTakeoverTime,
@@ -1942,8 +1974,8 @@ TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecond) {
OpTime currentOpTime(Timestamp(100, 1), 0);
OpTime behindOpTime(Timestamp(97, 1), 0);
OpTime closeEnoughOpTime(Timestamp(98, 1), 0);
- replCoordSetMyLastAppliedOpTime(behindOpTime);
- replCoordSetMyLastDurableOpTime(behindOpTime);
+ replCoordSetMyLastAppliedOpTime(behindOpTime, Date_t::min() + Seconds(behindOpTime.getSecs()));
+ replCoordSetMyLastDurableOpTime(behindOpTime, Date_t::min() + Seconds(behindOpTime.getSecs()));
// Make sure we're secondary and that no priority takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
@@ -1983,8 +2015,10 @@ TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecond) {
assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// Now make us caught up enough to call for priority takeover to succeed.
- replCoordSetMyLastAppliedOpTime(closeEnoughOpTime);
- replCoordSetMyLastDurableOpTime(closeEnoughOpTime);
+ replCoordSetMyLastAppliedOpTime(closeEnoughOpTime,
+ Date_t::min() + Seconds(closeEnoughOpTime.getSecs()));
+ replCoordSetMyLastDurableOpTime(closeEnoughOpTime,
+ Date_t::min() + Seconds(closeEnoughOpTime.getSecs()));
LastVote lastVoteExpected = LastVote(replCoord->getTerm() + 1, 0);
performSuccessfulTakeover(priorityTakeoverTime,
@@ -2011,8 +2045,8 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringDryRun) {
<< BSON("heartbeatIntervalMillis" << 100)),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateEnoughHeartbeatsForAllNodesUp();
// Advance to dry run vote request phase.
@@ -2076,8 +2110,8 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase)
<< BSON("heartbeatIntervalMillis" << 100)),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateEnoughHeartbeatsForAllNodesUp();
simulateSuccessfulDryRun();
ASSERT(TopologyCoordinator::Role::kCandidate == getTopoCoord().getRole());
@@ -2120,8 +2154,8 @@ protected:
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
+ hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min() + Seconds(opTime.getSecs())});
+ hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min() + Seconds(opTime.getSecs())});
return makeResponseStatus(hbResp.toBSON());
}
@@ -2190,8 +2224,8 @@ protected:
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
- replCoordSetMyLastAppliedOpTime(opTime);
- replCoordSetMyLastDurableOpTime(opTime);
+ replCoordSetMyLastAppliedOpTime(opTime, Date_t::min() + Seconds(opTime.getSecs()));
+ replCoordSetMyLastDurableOpTime(opTime, Date_t::min() + Seconds(opTime.getSecs()));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateSuccessfulV1Voting();
@@ -2254,8 +2288,8 @@ protected:
// Simulate the work done by bgsync and applier threads. setMyLastAppliedOpTime() will signal
// the optime waiter.
- void advanceMyLastAppliedOpTime(OpTime opTime) {
- replCoordSetMyLastAppliedOpTime(opTime);
+ void advanceMyLastAppliedOpTime(OpTime opTime, Date_t wallTime = Date_t::min()) {
+ replCoordSetMyLastAppliedOpTime(opTime, wallTime);
getNet()->enterNetwork();
getNet()->runReadyNetworkOperations();
getNet()->exitNetwork();
@@ -2300,7 +2334,7 @@ TEST_F(PrimaryCatchUpTest, CatchupSucceeds) {
net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
});
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- advanceMyLastAppliedOpTime(time2);
+ advanceMyLastAppliedOpTime(time2, Date_t::min() + Seconds(time2.getSecs()));
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully"));
@@ -2446,7 +2480,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
});
ReplicationCoordinatorImpl* replCoord = getReplCoord();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- advanceMyLastAppliedOpTime(time2);
+ advanceMyLastAppliedOpTime(time2, Date_t::min() + Seconds(time2.getSecs()));
ASSERT(replCoord->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest"));
@@ -2508,7 +2542,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
// 3) Advancing its applied optime to time 2 isn't enough.
- advanceMyLastAppliedOpTime(time2);
+ advanceMyLastAppliedOpTime(time2, Date_t::min() + Seconds(time2.getSecs()));
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
// 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target.
@@ -2528,12 +2562,12 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
// 5) Advancing to time 3 isn't enough now.
- advanceMyLastAppliedOpTime(time3);
+ advanceMyLastAppliedOpTime(time3, Date_t::min() + Seconds(time3.getSecs()));
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
// 6) The node catches up time 4 eventually.
startCapturingLogMessages();
- advanceMyLastAppliedOpTime(time4);
+ advanceMyLastAppliedOpTime(time4, Date_t::min() + Seconds(time4.getSecs()));
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest"));
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index bb330cdb9a7..f5f7650eb71 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -151,9 +151,17 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
BSONObj resp;
if (responseStatus.isOK()) {
resp = cbData.response.data;
- responseStatus = hbResponse.initialize(resp, _topCoord->getTerm());
+ // Wall clock times are required in ReplSetHeartbeatResponse when FCV is 4.2. Arbiters
+ // trivially have FCV equal to 4.2, so they are excluded from this check.
+ bool isArbiter = _topCoord->getMemberState() == MemberState::RS_ARBITER;
+ bool requireWallTime =
+ (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 &&
+ !isArbiter);
+ responseStatus = hbResponse.initialize(resp, _topCoord->getTerm(), requireWallTime);
StatusWith<rpc::ReplSetMetadata> replMetadata =
- rpc::ReplSetMetadata::readFromMetadata(cbData.response.data);
+ rpc::ReplSetMetadata::readFromMetadata(cbData.response.data, requireWallTime);
LOG_FOR_HEARTBEATS(2) << "Received response to heartbeat (requestId: " << cbData.request.id
<< ") from " << target << ", " << resp;
@@ -229,7 +237,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
hbStatusResponse.getValue().hasState() &&
hbStatusResponse.getValue().getState() != MemberState::RS_PRIMARY &&
action.getAdvancedOpTime()) {
- _updateLastCommittedOpTime(lk);
+ _updateLastCommittedOpTimeAndWallTime(lk);
}
// Abort catchup if we have caught up to the latest known optime after heartbeat refreshing.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 8612d9be65c..361cb5f4883 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -126,8 +126,8 @@ TEST_F(ReplCoordHBV1Test,
hbResp.setConfig(rsConfig);
// The smallest valid optime in PV1.
OpTime opTime(Timestamp(), 0);
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
+ hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min()});
+ hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min()});
BSONObjBuilder responseBuilder;
responseBuilder << "ok" << 1;
hbResp.addToBSON(&responseBuilder);
@@ -200,8 +200,8 @@ TEST_F(ReplCoordHBV1Test,
hbResp.setConfig(rsConfig);
// The smallest valid optime in PV1.
OpTime opTime(Timestamp(), 0);
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
+ hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min()});
+ hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min()});
BSONObjBuilder responseBuilder;
responseBuilder << "ok" << 1;
hbResp.addToBSON(&responseBuilder);
@@ -274,8 +274,8 @@ TEST_F(ReplCoordHBV1Test,
hbResp.setConfig(rsConfig);
// The smallest valid optime in PV1.
OpTime opTime(Timestamp(), 0);
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
+ hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min()});
+ hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min()});
BSONObjBuilder responseBuilder;
responseBuilder << "ok" << 1;
hbResp.addToBSON(&responseBuilder);
@@ -386,15 +386,20 @@ TEST_F(ReplCoordHBV1Test, IgnoreTheContentsOfMetadataWhenItsReplicaSetIdDoesNotM
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_PRIMARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
+ hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min()});
+ hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min()});
BSONObjBuilder responseBuilder;
responseBuilder << "ok" << 1;
hbResp.addToBSON(&responseBuilder);
- rpc::ReplSetMetadata metadata(
- opTime.getTerm(), opTime, opTime, rsConfig.getConfigVersion(), unexpectedId, 1, -1);
+ rpc::ReplSetMetadata metadata(opTime.getTerm(),
+ {opTime, Date_t::min()},
+ opTime,
+ rsConfig.getConfigVersion(),
+ unexpectedId,
+ 1,
+ -1);
uassertStatusOK(metadata.writeToMetadata(&responseBuilder));
heartbeatResponse = makeResponseStatus(responseBuilder.obj());
diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
index f7ceae5194d..e1490b238d2 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
@@ -83,8 +83,8 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenReconfigReceivedWhileSecondary) {
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
BSONObjBuilder result;
ReplSetReconfigArgs args;
@@ -108,8 +108,8 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenReconfigReceivedWith
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
BSONObjBuilder result;
@@ -152,8 +152,8 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenReconfigReceivedWith
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
BSONObjBuilder result;
@@ -192,8 +192,8 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenReconfigReceivedWith
<< BSON("replicaSetId" << OID::gen())),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
BSONObjBuilder result;
@@ -233,8 +233,8 @@ TEST_F(ReplCoordTest,
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
BSONObjBuilder result;
@@ -314,8 +314,8 @@ TEST_F(ReplCoordTest,
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
Status status(ErrorCodes::InternalError, "Not Set");
@@ -333,8 +333,10 @@ TEST_F(ReplCoordTest,
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(5);
BSONObjBuilder respObj;
- hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ hbResp.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
+ hbResp.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
respObj << "ok" << 1;
hbResp.addToBSON(&respObj);
net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj()));
@@ -357,8 +359,8 @@ TEST_F(ReplCoordTest, NodeReturnsOutOfDiskSpaceWhenSavingANewConfigFailsDuringRe
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
Status status(ErrorCodes::InternalError, "Not Set");
@@ -386,8 +388,8 @@ TEST_F(ReplCoordTest,
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
Status status(ErrorCodes::InternalError, "Not Set");
@@ -426,8 +428,8 @@ TEST_F(ReplCoordTest, NodeReturnsConfigurationInProgressWhenReceivingAReconfigWh
init();
start(HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
// initiate
Status status(ErrorCodes::InternalError, "Not Set");
@@ -475,8 +477,8 @@ TEST_F(ReplCoordTest, PrimaryNodeAcceptsNewConfigWhenReceivingAReconfigWithAComp
<< BSON("replicaSetId" << OID::gen())),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
Status status(ErrorCodes::InternalError, "Not Set");
@@ -493,8 +495,10 @@ TEST_F(ReplCoordTest, PrimaryNodeAcceptsNewConfigWhenReceivingAReconfigWithAComp
hbResp.setSetName("mySet");
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(2);
- hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ hbResp.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
+ hbResp.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
BSONObjBuilder respObj;
respObj << "ok" << 1;
hbResp.addToBSON(&respObj);
@@ -521,8 +525,8 @@ TEST_F(
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
@@ -552,8 +556,10 @@ TEST_F(
hbResp2.setConfigVersion(3);
hbResp2.setSetName("mySet");
hbResp2.setState(MemberState::RS_SECONDARY);
- hbResp2.setAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- hbResp2.setDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ hbResp2.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
+ hbResp2.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
BSONObjBuilder respObj2;
respObj2 << "ok" << 1;
hbResp2.addToBSON(&respObj2);
@@ -590,8 +596,8 @@ TEST_F(ReplCoordTest, NodeDoesNotAcceptHeartbeatReconfigWhileInTheMidstOfReconfi
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
@@ -626,8 +632,10 @@ TEST_F(ReplCoordTest, NodeDoesNotAcceptHeartbeatReconfigWhileInTheMidstOfReconfi
hbResp.setConfigVersion(4);
hbResp.setSetName("mySet");
hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ hbResp.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
+ hbResp.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
BSONObjBuilder respObj2;
respObj2 << "ok" << 1;
hbResp.addToBSON(&respObj2);
@@ -661,8 +669,8 @@ TEST_F(ReplCoordTest, NodeAcceptsConfigFromAReconfigWithForceTrueWhileNotPrimary
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
// fail before forced
BSONObjBuilder result;
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 84372bd0a9d..731e39e6edb 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -129,8 +129,8 @@ TEST_F(ReplCoordTest, IsMasterIsFalseDuringStepdown) {
ReplSetConfig config = assertMakeRSConfig(configObj);
auto replCoord = getReplCoord();
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
ASSERT(replCoord->getMemberState().primary());
@@ -417,7 +417,7 @@ TEST_F(ReplCoordTest, InitiateSucceedsWhenQuorumCheckPasses) {
hbArgs.setHeartbeatVersion(1);
auto appliedTS = Timestamp(3, 3);
- replCoordSetMyLastAppliedOpTime(OpTime(appliedTS, 1));
+ replCoordSetMyLastAppliedOpTime(OpTime(appliedTS, 1), Date_t::min() + Seconds(100));
Status status(ErrorCodes::InternalError, "Not set");
stdx::thread prsiThread([&] { doReplSetInitiate(getReplCoord(), &status); });
@@ -429,8 +429,10 @@ TEST_F(ReplCoordTest, InitiateSucceedsWhenQuorumCheckPasses) {
ASSERT_BSONOBJ_EQ(hbArgs.toBSON(), noi->getRequest().cmdObj);
ReplSetHeartbeatResponse hbResp;
hbResp.setConfigVersion(0);
- hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ hbResp.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
+ hbResp.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
getNet()->scheduleResponse(
noi, startDate + Milliseconds(10), RemoteCommandResponse(hbResp.toBSON(), Milliseconds(8)));
getNet()->runUntil(startDate + Milliseconds(10));
@@ -704,8 +706,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
// Become primary.
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
ASSERT(getReplCoord()->getMemberState().primary());
@@ -741,8 +743,8 @@ TEST_F(ReplCoordTest,
<< 3))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTimeWithTermOne time1(100, 2);
@@ -758,8 +760,8 @@ TEST_F(ReplCoordTest,
ReplicationCoordinator::StatusAndDuration statusAndDur =
getReplCoord()->awaitReplication(opCtx.get(), time1, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100));
statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time1, writeConcern);
ASSERT_OK(statusAndDur.status);
@@ -778,8 +780,8 @@ TEST_F(ReplCoordTest,
// 2 nodes waiting for time2
statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time2, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastDurableOpTime(time2);
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100));
statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time2, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time2));
@@ -821,8 +823,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes
<< 3))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTimeWithTermOne time1(100, 2);
@@ -839,8 +841,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes
ReplicationCoordinator::StatusAndDuration statusAndDur =
getReplCoord()->awaitReplication(opCtx.get(), time1, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100));
statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time1, writeConcern);
ASSERT_OK(statusAndDur.status);
@@ -855,8 +857,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes
// 2 nodes waiting for time2
statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time2, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastDurableOpTime(time2);
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100));
statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time2, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time2));
@@ -892,8 +894,8 @@ TEST_F(ReplCoordTest,
<< "node4"))),
HostAndPort("node0"));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTime time1(Timestamp(100, 1), 1);
@@ -959,8 +961,8 @@ TEST_F(
<< BSON("dc" << 2 << "rack" << 3)))),
HostAndPort("node0"));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTime time1(Timestamp(100, 2), 1);
@@ -982,8 +984,8 @@ TEST_F(
auto opCtx = makeOperationContext();
// Nothing satisfied
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100));
ReplicationCoordinator::StatusAndDuration statusAndDur =
getReplCoord()->awaitReplication(opCtx.get(), time1, majorityWriteConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
@@ -1017,8 +1019,8 @@ TEST_F(
ASSERT_OK(statusAndDur.status);
// multiDC satisfied but not majority or multiRack
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastDurableOpTime(time2);
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100));
getReplCoord()->setLastAppliedOptime_forTest(2, 3, time2).transitional_ignore();
getReplCoord()->setLastDurableOptime_forTest(2, 3, time2).transitional_ignore();
@@ -1115,8 +1117,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie
<< 2))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
@@ -1132,8 +1134,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie
awaiter.setOpTime(time1);
awaiter.setWriteConcern(writeConcern);
awaiter.start();
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
ASSERT_OK(statusAndDur.status);
@@ -1142,8 +1144,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie
// 2 nodes waiting for time2
awaiter.setOpTime(time2);
awaiter.start();
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastDurableOpTime(time2);
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time2));
statusAndDur = awaiter.getResult();
ASSERT_OK(statusAndDur.status);
@@ -1179,8 +1181,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo
<< 2))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
@@ -1196,8 +1198,8 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo
awaiter.setOpTime(time2);
awaiter.setWriteConcern(writeConcern);
awaiter.start();
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastDurableOpTime(time2);
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
{
NetworkInterfaceMock::InNetworkGuard inNet(getNet());
@@ -1230,8 +1232,8 @@ TEST_F(ReplCoordTest,
<< 2))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
@@ -1280,8 +1282,8 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite
<< 2))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
const auto opCtx = makeOperationContext();
@@ -1322,8 +1324,8 @@ TEST_F(ReplCoordTest,
<< "node3"))),
HostAndPort("node1"));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
@@ -1387,9 +1389,12 @@ protected:
}
// Makes it so enough secondaries are caught up that a stepdown command can succeed.
- void catchUpSecondaries(const OpTime& desiredOpTime) {
+ void catchUpSecondaries(const OpTime& desiredOpTime, Date_t desiredWallTime = Date_t::min()) {
auto config = getReplCoord()->getConfig();
auto heartbeatInterval = config.getHeartbeatInterval();
+ if (desiredWallTime == Date_t::min() && !desiredOpTime.isNull()) {
+ desiredWallTime = Date_t::min() + Seconds(desiredOpTime.getSecs());
+ }
enterNetwork();
getNet()->runUntil(getNet()->now() + heartbeatInterval);
@@ -1402,8 +1407,8 @@ protected:
hbResp.setSetName(hbArgs.getSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(hbArgs.getConfigVersion());
- hbResp.setAppliedOpTime(desiredOpTime);
- hbResp.setDurableOpTime(desiredOpTime);
+ hbResp.setAppliedOpTimeAndWallTime({desiredOpTime, desiredWallTime});
+ hbResp.setDurableOpTimeAndWallTime({desiredOpTime, desiredWallTime});
BSONObjBuilder respObj;
respObj << "ok" << 1;
hbResp.addToBSON(&respObj);
@@ -1468,8 +1473,8 @@ TEST_F(ReplCoordTest, ElectionIdTracksTermInPV1) {
<< "protocolVersion"
<< 1),
HostAndPort("test1", 1234));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
@@ -1532,8 +1537,8 @@ TEST_F(ReplCoordTest, NodeChangesTermAndStepsDownWhenAndOnlyWhenUpdateTermSuppli
<< "protocolVersion"
<< 1),
HostAndPort("test1", 1234));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
@@ -1578,8 +1583,8 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha
<< "protocolVersion"
<< 1),
HostAndPort("test1", 1234));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
@@ -1634,8 +1639,8 @@ TEST_F(ReplCoordTest, DrainCompletionMidStepDown) {
<< "protocolVersion"
<< 1),
HostAndPort("test1", 1234));
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
@@ -1671,8 +1676,8 @@ TEST_F(StepDownTest, StepDownCanCompleteBasedOnReplSetUpdatePositionAlone) {
OpTimeWithTermOne opTime1(100, 1);
OpTimeWithTermOne opTime2(200, 1);
- replCoordSetMyLastAppliedOpTime(opTime2);
- replCoordSetMyLastDurableOpTime(opTime2);
+ replCoordSetMyLastAppliedOpTime(opTime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(opTime2, Date_t::min() + Seconds(100));
// Secondaries not caught up yet.
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1));
@@ -1691,7 +1696,8 @@ TEST_F(StepDownTest, StepDownCanCompleteBasedOnReplSetUpdatePositionAlone) {
long long configVersion = repl->getConfig().getConfigVersion();
UpdatePositionArgs updatePositionArgs;
- ASSERT_OK(updatePositionArgs.initialize(
+ ASSERT_OK(updatePositionArgsInitialize(
+ updatePositionArgs,
BSON(UpdatePositionArgs::kCommandFieldName
<< 1
<< UpdatePositionArgs::kUpdateArrayFieldName
@@ -1701,16 +1707,24 @@ TEST_F(StepDownTest, StepDownCanCompleteBasedOnReplSetUpdatePositionAlone) {
<< 1
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(opTime2.asOpTime().getSecs())
<< UpdatePositionArgs::kDurableOpTimeFieldName
- << opTime2.asOpTime().toBSON())
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(opTime2.asOpTime().getSecs()))
<< BSON(UpdatePositionArgs::kConfigVersionFieldName
<< configVersion
<< UpdatePositionArgs::kMemberIdFieldName
<< 2
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< opTime1.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(opTime1.asOpTime().getSecs())
<< UpdatePositionArgs::kDurableOpTimeFieldName
- << opTime1.asOpTime().toBSON())))));
+ << opTime1.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(opTime1.asOpTime().getSecs()))))));
ASSERT_OK(repl->processReplSetUpdatePosition(updatePositionArgs, &configVersion));
@@ -1725,8 +1739,8 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) {
OpTimeWithTermOne opTime1(100, 1);
OpTimeWithTermOne opTime2(200, 1);
- replCoordSetMyLastAppliedOpTime(opTime2);
- replCoordSetMyLastDurableOpTime(opTime2);
+ replCoordSetMyLastAppliedOpTime(opTime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(opTime2, Date_t::min() + Seconds(100));
// Secondaries not caught up yet.
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1));
@@ -1812,8 +1826,8 @@ TEST_F(StepDownTestWithUnelectableNode,
OpTimeWithTermOne opTime1(100, 1);
OpTimeWithTermOne opTime2(200, 1);
- replCoordSetMyLastAppliedOpTime(opTime2);
- replCoordSetMyLastDurableOpTime(opTime2);
+ replCoordSetMyLastAppliedOpTime(opTime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(opTime2, Date_t::min() + Seconds(100));
// No secondaries are caught up yet.
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1));
@@ -1834,7 +1848,8 @@ TEST_F(StepDownTestWithUnelectableNode,
long long configVersion = repl->getConfig().getConfigVersion();
UpdatePositionArgs catchupFirstSecondary;
- ASSERT_OK(catchupFirstSecondary.initialize(
+ ASSERT_OK(updatePositionArgsInitialize(
+ catchupFirstSecondary,
BSON(UpdatePositionArgs::kCommandFieldName
<< 1
<< UpdatePositionArgs::kUpdateArrayFieldName
@@ -1844,16 +1859,24 @@ TEST_F(StepDownTestWithUnelectableNode,
<< 1
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(opTime2.asOpTime().getSecs())
<< UpdatePositionArgs::kDurableOpTimeFieldName
- << opTime2.asOpTime().toBSON())
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(opTime2.asOpTime().getSecs()))
<< BSON(UpdatePositionArgs::kConfigVersionFieldName
<< configVersion
<< UpdatePositionArgs::kMemberIdFieldName
<< 2
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< opTime1.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(opTime1.asOpTime().getSecs())
<< UpdatePositionArgs::kDurableOpTimeFieldName
- << opTime1.asOpTime().toBSON())))));
+ << opTime1.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(opTime1.asOpTime().getSecs()))))));
ASSERT_OK(repl->processReplSetUpdatePosition(catchupFirstSecondary, &configVersion));
@@ -1864,7 +1887,8 @@ TEST_F(StepDownTestWithUnelectableNode,
// there is an electable node, so stepDown will complete.
UpdatePositionArgs catchupOtherSecondary;
- ASSERT_OK(catchupOtherSecondary.initialize(
+ ASSERT_OK(updatePositionArgsInitialize(
+ catchupOtherSecondary,
BSON(UpdatePositionArgs::kCommandFieldName
<< 1
<< UpdatePositionArgs::kUpdateArrayFieldName
@@ -1874,16 +1898,24 @@ TEST_F(StepDownTestWithUnelectableNode,
<< 1
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(opTime2.asOpTime().getSecs())
<< UpdatePositionArgs::kDurableOpTimeFieldName
- << opTime2.asOpTime().toBSON())
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(opTime2.asOpTime().getSecs()))
<< BSON(UpdatePositionArgs::kConfigVersionFieldName
<< configVersion
<< UpdatePositionArgs::kMemberIdFieldName
<< 2
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(opTime2.asOpTime().getSecs())
<< UpdatePositionArgs::kDurableOpTimeFieldName
- << opTime2.asOpTime().toBSON())))));
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(opTime2.asOpTime().getSecs()))))));
ASSERT_OK(repl->processReplSetUpdatePosition(catchupOtherSecondary, &configVersion));
@@ -1897,8 +1929,8 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
OpTimeWithTermOne optime1(100, 1);
// All nodes are caught up
- replCoordSetMyLastAppliedOpTime(optime1);
- replCoordSetMyLastDurableOpTime(optime1);
+ replCoordSetMyLastAppliedOpTime(optime1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime1, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -1916,8 +1948,8 @@ TEST_F(StepDownTest,
// Set up this test so that all nodes are caught up. This is necessary to exclude the false
// positive case where stepDown returns "ExceededTimeLimit", but not because it could not
// acquire the lock, but because it could not satisfy all stepdown conditions on time.
- replCoordSetMyLastAppliedOpTime(optime1);
- replCoordSetMyLastDurableOpTime(optime1);
+ replCoordSetMyLastAppliedOpTime(optime1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime1, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -1955,8 +1987,16 @@ protected:
*/
void simulateHeartbeatResponses(OpTime optimePrimary,
OpTime optimeLagged,
- int numNodesCaughtUp) {
+ int numNodesCaughtUp,
+ Date_t wallTimePrimary = Date_t::min(),
+ Date_t wallTimeLagged = Date_t::min()) {
int hbNum = 1;
+ if (wallTimePrimary == Date_t::min()) {
+ wallTimePrimary = wallTimePrimary + Seconds(optimePrimary.getSecs());
+ }
+ if (wallTimeLagged == Date_t::min()) {
+ wallTimeLagged = wallTimeLagged + Seconds(optimeLagged.getSecs());
+ }
while (getNet()->hasReadyRequests()) {
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
@@ -1972,13 +2012,15 @@ protected:
// Catch up 'numNodesCaughtUp' nodes out of 5.
OpTime optimeResponse = (hbNum <= numNodesCaughtUp) ? optimePrimary : optimeLagged;
+ Date_t wallTimeResponse =
+ (hbNum <= numNodesCaughtUp) ? wallTimePrimary : wallTimeLagged;
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(hbArgs.getSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(hbArgs.getConfigVersion());
- hbResp.setDurableOpTime(optimeResponse);
- hbResp.setAppliedOpTime(optimeResponse);
+ hbResp.setDurableOpTimeAndWallTime({optimeResponse, wallTimeResponse});
+ hbResp.setAppliedOpTimeAndWallTime({optimeResponse, wallTimeResponse});
BSONObjBuilder respObj;
respObj << "ok" << 1;
hbResp.addToBSON(&respObj);
@@ -2018,8 +2060,8 @@ TEST_F(StepDownTestFiveNode,
OpTime optimePrimary(Timestamp(100, 2), 1);
// All nodes are caught up
- replCoordSetMyLastAppliedOpTime(optimePrimary);
- replCoordSetMyLastDurableOpTime(optimePrimary);
+ replCoordSetMyLastAppliedOpTime(optimePrimary, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optimePrimary, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optimeLagged));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optimeLagged));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 3, optimeLagged));
@@ -2055,8 +2097,8 @@ TEST_F(
OpTime optimePrimary(Timestamp(100, 2), 1);
// All nodes are caught up
- replCoordSetMyLastAppliedOpTime(optimePrimary);
- replCoordSetMyLastDurableOpTime(optimePrimary);
+ replCoordSetMyLastAppliedOpTime(optimePrimary, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optimePrimary, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optimeLagged));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optimeLagged));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 3, optimeLagged));
@@ -2144,8 +2186,8 @@ TEST_F(ReplCoordTest, SingleNodeReplSetUnfreeze) {
// Become Secondary.
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
ASSERT_TRUE(getTopoCoord().getMemberState().secondary());
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
@@ -2248,8 +2290,8 @@ TEST_F(StepDownTest,
OpTimeWithTermOne optime2(100, 2);
// No secondary is caught up
auto repl = getReplCoord();
- replCoordSetMyLastAppliedOpTime(optime2);
- replCoordSetMyLastDurableOpTime(optime2);
+ replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -2286,8 +2328,8 @@ TEST_F(StepDownTest,
// No secondary is caught up
auto repl = getReplCoord();
- replCoordSetMyLastAppliedOpTime(optime2);
- replCoordSetMyLastDurableOpTime(optime2);
+ replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -2298,7 +2340,7 @@ TEST_F(StepDownTest,
// Step down where the secondary actually has to catch up before the stepDown can succeed.
auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
- catchUpSecondaries(optime2);
+ catchUpSecondaries(optime2, Date_t::min() + Seconds(optime2.getSecs()));
ASSERT_OK(*result.second.get());
ASSERT_TRUE(repl->getMemberState().secondary());
@@ -2311,8 +2353,8 @@ TEST_F(StepDownTest,
// No secondary is caught up
auto repl = getReplCoord();
- replCoordSetMyLastAppliedOpTime(optime2);
- replCoordSetMyLastDurableOpTime(optime2);
+ replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -2356,8 +2398,8 @@ TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) {
OpTimeWithTermOne optime2(100, 2);
// No secondary is caught up
auto repl = getReplCoord();
- replCoordSetMyLastAppliedOpTime(optime2);
- replCoordSetMyLastDurableOpTime(optime2);
+ replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -2378,8 +2420,8 @@ TEST_F(StepDownTest, OnlyOneStepDownCmdIsAllowedAtATime) {
// No secondary is caught up
auto repl = getReplCoord();
- replCoordSetMyLastAppliedOpTime(optime2);
- replCoordSetMyLastDurableOpTime(optime2);
+ replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -2415,8 +2457,8 @@ TEST_F(StepDownTest, UnconditionalStepDownFailsStepDownCommand) {
// No secondary is caught up
auto repl = getReplCoord();
- replCoordSetMyLastAppliedOpTime(optime2);
- replCoordSetMyLastDurableOpTime(optime2);
+ replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -2448,8 +2490,8 @@ TEST_F(StepDownTest, InterruptingStepDownCommandRestoresWriteAvailability) {
// No secondary is caught up
auto repl = getReplCoord();
- replCoordSetMyLastAppliedOpTime(optime2);
- replCoordSetMyLastDurableOpTime(optime2);
+ replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -2500,8 +2542,8 @@ TEST_F(StepDownTest, InterruptingAfterUnconditionalStepdownDoesNotRestoreWriteAv
// No secondary is caught up
auto repl = getReplCoord();
- replCoordSetMyLastAppliedOpTime(optime2);
- replCoordSetMyLastDurableOpTime(optime2);
+ replCoordSetMyLastAppliedOpTime(optime2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime2, Date_t::min() + Seconds(100));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, optime1));
@@ -2595,8 +2637,8 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInUpdatePositionCommand) {
OpTime optime1({2, 1}, 1);
OpTime optime2({100, 1}, 1);
OpTime optime3({100, 2}, 1);
- replCoordSetMyLastAppliedOpTime(optime1);
- replCoordSetMyLastDurableOpTime(optime1);
+ replCoordSetMyLastAppliedOpTime(optime1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime1, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime2));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime3));
ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(1, 2, optime3));
@@ -2672,8 +2714,8 @@ TEST_F(ReplCoordTest,
<< "test3:1234"))),
HostAndPort("test2", 1234));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
// Can't unset maintenance mode if it was never set to begin with.
Status status = getReplCoord()->setMaintenanceMode(false);
@@ -2699,8 +2741,8 @@ TEST_F(ReplCoordTest,
<< "test3:1234"))),
HostAndPort("test2", 1234));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
// valid set
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
@@ -2735,8 +2777,8 @@ TEST_F(ReplCoordTest, AllowAsManyUnsetMaintenanceModesAsThereHaveBeenSetMaintena
<< "test3:1234"))),
HostAndPort("test2", 1234));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
// Can set multiple times
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
@@ -2769,8 +2811,8 @@ TEST_F(ReplCoordTest, SettingAndUnsettingMaintenanceModeShouldNotAffectRollbackS
<< "test3:1234"))),
HostAndPort("test2", 1234));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
// We must take the RSTL in mode X before transitioning to RS_ROLLBACK.
const auto opCtx = makeOperationContext();
@@ -2815,8 +2857,8 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) {
<< "test3:1234"))),
HostAndPort("test2", 1234));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
// Can't modify maintenance mode when PRIMARY
simulateSuccessfulV1Election();
@@ -2854,8 +2896,8 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection)
<< "test3:1234"))),
HostAndPort("test2", 1234));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
// TODO this election shouldn't have to happen.
simulateSuccessfulV1Election();
@@ -2924,8 +2966,8 @@ TEST_F(ReplCoordTest,
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastDurableOpTime(time2);
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 1, time1));
@@ -2969,8 +3011,8 @@ TEST_F(ReplCoordTest,
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastDurableOpTime(time2);
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
std::vector<HostAndPort> caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2, false);
@@ -3063,7 +3105,7 @@ TEST_F(ReplCoordTest, IsMaster) {
time_t lastWriteDate = 100;
OpTime opTime = OpTime(Timestamp(lastWriteDate, 2), 1);
- replCoordSetMyLastAppliedOpTime(opTime);
+ replCoordSetMyLastAppliedOpTime(opTime, Date_t::min() + Seconds(100));
IsMasterResponse response;
getReplCoord()->fillIsMasterForReplSet(&response);
@@ -3125,8 +3167,8 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) {
time_t majorityWriteDate = lastWriteDate;
OpTime majorityOpTime = opTime;
- replCoordSetMyLastAppliedOpTime(opTime);
- replCoordSetMyLastDurableOpTime(opTime);
+ replCoordSetMyLastAppliedOpTime(opTime, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(opTime, Date_t::min() + Seconds(100));
ASSERT_EQUALS(majorityOpTime, getReplCoord()->getCurrentCommittedSnapshotOpTime());
IsMasterResponse response;
@@ -3198,14 +3240,14 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
<< 2))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTime time1({100, 1}, 1);
OpTime time2({100, 2}, 1);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100));
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -3219,17 +3261,23 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
// receive updatePosition containing ourself, should not process the update for self
UpdatePositionArgs args;
- ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
- << 1
- << UpdatePositionArgs::kUpdateArrayFieldName
- << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 0
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << time2.toBSON()
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << time2.toBSON())))));
+ ASSERT_OK(updatePositionArgsInitialize(
+ args,
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 0
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << time2.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(time2.getSecs())
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << time2.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(time2.getSecs()))))));
ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
@@ -3256,14 +3304,14 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect)
<< 2))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTime time1({100, 1}, 1);
OpTime time2({100, 2}, 1);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100));
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -3271,17 +3319,23 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect)
// receive updatePosition with incorrect config version
UpdatePositionArgs args;
- ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
- << 1
- << UpdatePositionArgs::kUpdateArrayFieldName
- << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 3
- << UpdatePositionArgs::kMemberIdFieldName
- << 1
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << time2.toBSON()
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << time2.toBSON())))));
+ ASSERT_OK(updatePositionArgsInitialize(
+ args,
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 3
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 1
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << time2.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(time2.getSecs())
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << time2.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(time2.getSecs()))))));
auto opCtx = makeOperationContext();
@@ -3313,14 +3367,14 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf
<< 2))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTime time1({100, 1}, 1);
OpTime time2({100, 2}, 1);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100));
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -3328,17 +3382,23 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf
// receive updatePosition with nonexistent member id
UpdatePositionArgs args;
- ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
- << 1
- << UpdatePositionArgs::kUpdateArrayFieldName
- << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 9
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << time2.toBSON()
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << time2.toBSON())))));
+ ASSERT_OK(updatePositionArgsInitialize(
+ args,
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 9
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << time2.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(time2.getSecs())
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << time2.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(time2.getSecs()))))));
auto opCtx = makeOperationContext();
@@ -3369,44 +3429,53 @@ TEST_F(ReplCoordTest,
<< 2))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
OpTimeWithTermOne staleTime(10, 0);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastDurableOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time1, Date_t::min() + Seconds(100));
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
writeConcern.wNumNodes = 1;
// receive a good update position
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastDurableOpTime(time2);
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100));
UpdatePositionArgs args;
- ASSERT_OK(
- args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
- << 1
- << UpdatePositionArgs::kUpdateArrayFieldName
- << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 1
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << time2.asOpTime().toBSON()
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << time2.asOpTime().toBSON())
- << BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 2
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << time2.asOpTime().toBSON()
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << time2.asOpTime().toBSON())))));
+ ASSERT_OK(updatePositionArgsInitialize(
+ args,
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 1
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << time2.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(time2.asOpTime().getSecs())
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << time2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(time2.asOpTime().getSecs()))
+ << BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 2
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << time2.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(time2.asOpTime().getSecs())
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << time2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(time2.asOpTime().getSecs()))))));
auto opCtx = makeOperationContext();
@@ -3468,8 +3537,8 @@ TEST_F(ReplCoordTest, AwaitReplicationShouldResolveAsNormalDuringAReconfig) {
disableSnapshots();
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 2), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 2), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTimeWithTermOne time(100, 2);
@@ -3561,8 +3630,8 @@ TEST_F(
<< 2))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 2), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 2), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTimeWithTermOne time(100, 2);
@@ -3635,15 +3704,15 @@ TEST_F(ReplCoordTest,
disableSnapshots();
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
OpTime time(Timestamp(100, 2), 1);
auto opCtx = makeOperationContext();
- replCoordSetMyLastAppliedOpTime(time);
- replCoordSetMyLastDurableOpTime(time);
+ replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time));
@@ -3720,8 +3789,8 @@ TEST_F(ReplCoordTest,
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
OpTime time(Timestamp(100, 1), 1);
- replCoordSetMyLastAppliedOpTime(time);
- replCoordSetMyLastDurableOpTime(time);
+ replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
WriteConcernOptions majorityWriteConcern;
@@ -3789,8 +3858,8 @@ TEST_F(ReplCoordTest,
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
OpTime zero(Timestamp(0, 0), 0);
OpTime time(Timestamp(100, 1), 1);
- replCoordSetMyLastAppliedOpTime(time);
- replCoordSetMyLastDurableOpTime(time);
+ replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
ASSERT_EQUALS(zero, getReplCoord()->getLastCommittedOpTime());
@@ -3809,8 +3878,8 @@ TEST_F(ReplCoordTest,
// Set a new, later OpTime.
OpTime newTime(Timestamp(100, 1), 1);
- replCoordSetMyLastAppliedOpTime(newTime);
- replCoordSetMyLastDurableOpTime(newTime);
+ replCoordSetMyLastAppliedOpTime(newTime, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(newTime, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime());
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 3, newTime));
ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 3, newTime));
@@ -4019,16 +4088,16 @@ TEST_F(StableOpTimeTest, SetMyLastAppliedSetsStableOpTimeForStorage) {
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getStorageInterface()->allCommittedTimestamp = Timestamp(1, 1);
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(1, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(1, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
// Advance the commit point so it's higher than all the others.
- repl->advanceCommitPoint(OpTimeWithTermOne(10, 1), false);
+ replCoordAdvanceCommitPoint(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100), false);
ASSERT_EQUALS(Timestamp(1, 1), getStorageInterface()->getStableTimestamp());
// Check that the stable timestamp is not updated if the all-committed timestamp is behind.
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 2));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 2), Date_t::min() + Seconds(100));
stableTimestamp = getStorageInterface()->getStableTimestamp();
ASSERT_EQUALS(Timestamp(1, 1), getStorageInterface()->getStableTimestamp());
@@ -4036,12 +4105,12 @@ TEST_F(StableOpTimeTest, SetMyLastAppliedSetsStableOpTimeForStorage) {
// Check that the stable timestamp is updated for the storage engine when we set the applied
// optime.
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 1), Date_t::min() + Seconds(100));
stableTimestamp = getStorageInterface()->getStableTimestamp();
ASSERT_EQUALS(Timestamp(2, 1), stableTimestamp);
// Check that timestamp cleanup occurs.
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 2));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 2), Date_t::min() + Seconds(100));
stableTimestamp = getStorageInterface()->getStableTimestamp();
ASSERT_EQUALS(Timestamp(2, 2), stableTimestamp);
@@ -4087,7 +4156,7 @@ TEST_F(StableOpTimeTest, SetMyLastAppliedSetsStableOpTimeForStorageDisableMajori
// Check that the stable timestamp is updated for the storage engine when we set the applied
// optime, even though the last committed optime is unset.
getStorageInterface()->allCommittedTimestamp = Timestamp(1, 1);
- replCoordSetMyLastAppliedOpTime(OpTime({1, 1}, 1));
+ replCoordSetMyLastAppliedOpTime(OpTime({1, 1}, 1), Date_t::min() + Seconds(100));
ASSERT_EQUALS(Timestamp(1, 1), getStorageInterface()->getStableTimestamp());
}
@@ -4115,11 +4184,10 @@ TEST_F(StableOpTimeTest, AdvanceCommitPointSetsStableOpTimeForStorage) {
<< "test3:1234"))),
HostAndPort("test2", 1234));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(1, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(1, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(1, 1), Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
- auto repl = getReplCoord();
Timestamp stableTimestamp;
long long term = 2;
@@ -4127,24 +4195,30 @@ TEST_F(StableOpTimeTest, AdvanceCommitPointSetsStableOpTimeForStorage) {
getStorageInterface()->allCommittedTimestamp = Timestamp(2, 1);
// Add three stable optime candidates.
- replCoordSetMyLastAppliedOpTime(OpTime({2, 1}, term));
- replCoordSetMyLastAppliedOpTime(OpTime({2, 2}, term));
- replCoordSetMyLastAppliedOpTime(OpTime({3, 2}, term));
+ replCoordSetMyLastAppliedOpTime(OpTime({2, 1}, term), Date_t::min() + Seconds(1));
+ replCoordSetMyLastAppliedOpTime(OpTime({2, 2}, term), Date_t::min() + Seconds(2));
+ replCoordSetMyLastAppliedOpTime(OpTime({3, 2}, term), Date_t::min() + Seconds(3));
// Set a commit point and check the stable optime.
- repl->advanceCommitPoint(OpTime({2, 1}, term), false);
+ replCoordAdvanceCommitPoint(OpTime({2, 1}, term), Date_t::min() + Seconds(1), false);
+ ASSERT_EQUALS(getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime,
+ Date_t::min() + Seconds(1));
stableTimestamp = getStorageInterface()->getStableTimestamp();
ASSERT_EQUALS(Timestamp(2, 1), stableTimestamp);
// Check that the stable timestamp is not updated if the all-committed timestamp is behind.
- repl->advanceCommitPoint(OpTime({2, 2}, term), false);
+ replCoordAdvanceCommitPoint(OpTime({2, 2}, term), Date_t::min() + Seconds(2), false);
+ ASSERT_EQUALS(getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime,
+ Date_t::min() + Seconds(2));
stableTimestamp = getStorageInterface()->getStableTimestamp();
ASSERT_EQUALS(Timestamp(2, 1), stableTimestamp);
getStorageInterface()->allCommittedTimestamp = Timestamp(4, 4);
// Check that the stable timestamp is updated when we advance the commit point.
- repl->advanceCommitPoint(OpTime({3, 2}, term), false);
+ replCoordAdvanceCommitPoint(OpTime({3, 2}, term), Date_t::min() + Seconds(3), false);
+ ASSERT_EQUALS(getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime,
+ Date_t::min() + Seconds(3));
stableTimestamp = getStorageInterface()->getStableTimestamp();
ASSERT_EQUALS(Timestamp(3, 2), stableTimestamp);
@@ -4177,14 +4251,14 @@ TEST_F(StableOpTimeTest, ClearOpTimeCandidatesPastCommonPointAfterRollback) {
OpTime commitPoint = OpTime({1, 2}, term);
ASSERT_EQUALS(Timestamp::min(), getStorageInterface()->getStableTimestamp());
- replCoordSetMyLastAppliedOpTime(OpTime({0, 1}, term));
+ replCoordSetMyLastAppliedOpTime(OpTime({0, 1}, term), Date_t::min() + Seconds(100));
// Advance commit point when it has the same term as the last applied.
- repl->advanceCommitPoint(commitPoint, false);
+ replCoordAdvanceCommitPoint(commitPoint, Date_t::min() + Seconds(100), false);
- replCoordSetMyLastAppliedOpTime(OpTime({1, 1}, term));
- replCoordSetMyLastAppliedOpTime(OpTime({1, 2}, term));
- replCoordSetMyLastAppliedOpTime(OpTime({1, 3}, term));
- replCoordSetMyLastAppliedOpTime(OpTime({1, 4}, term));
+ replCoordSetMyLastAppliedOpTime(OpTime({1, 1}, term), Date_t::min() + Seconds(100));
+ replCoordSetMyLastAppliedOpTime(OpTime({1, 2}, term), Date_t::min() + Seconds(100));
+ replCoordSetMyLastAppliedOpTime(OpTime({1, 3}, term), Date_t::min() + Seconds(100));
+ replCoordSetMyLastAppliedOpTime(OpTime({1, 4}, term), Date_t::min() + Seconds(100));
// The stable timestamp should be equal to the commit point timestamp.
const Timestamp stableTimestamp = getStorageInterface()->getStableTimestamp();
@@ -4207,7 +4281,8 @@ TEST_F(StableOpTimeTest, ClearOpTimeCandidatesPastCommonPointAfterRollback) {
ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, opTimeCandidates);
// Simulate a rollback to the common point.
- getExternalState()->setLastOpTimeAndWallTime(rollbackCommonPoint);
+ getExternalState()->setLastOpTimeAndWallTime(
+ rollbackCommonPoint, Date_t::min() + Seconds(rollbackCommonPoint.getSecs()));
repl->resetLastOpTimesFromOplog(opCtx.get(),
ReplicationCoordinator::DataConsistency::Inconsistent);
@@ -4237,14 +4312,16 @@ TEST_F(StableOpTimeTest, OpTimeCandidatesAreNotAddedWhenStateIsNotConsistent) {
// Set the lastApplied optime forward when data is consistent, and check that it was added to
// the candidate set.
replCoordSetMyLastAppliedOpTimeForward(consistentOpTime,
- ReplicationCoordinator::DataConsistency::Consistent);
+ ReplicationCoordinator::DataConsistency::Consistent,
+ Date_t::min() + Seconds(100));
ASSERT_EQUALS(consistentOpTime, repl->getMyLastAppliedOpTime());
ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, repl->getStableOpTimeCandidates_forTest());
// Set the lastApplied optime forward when data is not consistent, and check that it wasn't
// added to the candidate set.
replCoordSetMyLastAppliedOpTimeForward(inconsistentOpTime,
- ReplicationCoordinator::DataConsistency::Inconsistent);
+ ReplicationCoordinator::DataConsistency::Inconsistent,
+ Date_t::min() + Seconds(100));
ASSERT_EQUALS(inconsistentOpTime, repl->getMyLastAppliedOpTime());
ASSERT_OPTIME_SET_EQ(expectedOpTimeCandidates, repl->getStableOpTimeCandidates_forTest());
}
@@ -4262,8 +4339,8 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin
<< 0))),
HostAndPort("node1", 12345));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(10, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(10, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100));
auto opCtx = makeOperationContext();
@@ -4287,8 +4364,8 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte
<< 0))),
HostAndPort("node1", 12345));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(10, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(10, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(10, 1), Date_t::min() + Seconds(100));
const auto opCtx = makeOperationContext();
killOperation(opCtx.get());
@@ -4328,8 +4405,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
<< 0))),
HostAndPort("node1", 12345));
- replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
- replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
+ replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTimeWithTermOne(100, 1), Date_t::min() + Seconds(100));
auto opCtx = makeOperationContext();
@@ -4352,8 +4429,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
OpTimeWithTermOne time(100, 1);
- replCoordSetMyLastAppliedOpTime(time);
- replCoordSetMyLastDurableOpTime(time);
+ replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100));
auto opCtx = makeOperationContext();
@@ -4400,8 +4477,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) {
auto opCtx = makeOperationContext();
runSingleNodeElection(opCtx.get());
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(100));
shutdown(opCtx.get());
@@ -4425,8 +4502,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) {
const auto opCtx = makeOperationContext();
runSingleNodeElection(opCtx.get());
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(10, 1), 0), Date_t::min() + Seconds(100));
killOperation(opCtx.get());
auto status = getReplCoord()->waitUntilOpTimeForRead(
opCtx.get(),
@@ -4448,8 +4525,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) {
auto opCtx = makeOperationContext();
runSingleNodeElection(opCtx.get());
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead(
opCtx.get(),
@@ -4471,8 +4548,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) {
runSingleNodeElection(opCtx.get());
OpTime time(Timestamp(100, 1), 1);
- replCoordSetMyLastAppliedOpTime(time);
- replCoordSetMyLastDurableOpTime(time);
+ replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead(
opCtx.get(), ReadConcernArgs(time, ReadConcernLevel::kMajorityReadConcern)));
@@ -4492,13 +4569,13 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) {
auto opCtx = makeOperationContext();
runSingleNodeElection(opCtx.get());
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100));
OpTime committedOpTime(Timestamp(200, 1), 1);
auto pseudoLogOp = stdx::async(stdx::launch::async, [this, &committedOpTime]() {
// Not guaranteed to be scheduled after waitUntil blocks...
- replCoordSetMyLastAppliedOpTime(committedOpTime);
- replCoordSetMyLastDurableOpTime(committedOpTime);
+ replCoordSetMyLastAppliedOpTime(committedOpTime, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(committedOpTime, Date_t::min() + Seconds(100));
});
ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead(
@@ -4519,15 +4596,15 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) {
HostAndPort("node1", 12345));
auto opCtx = makeOperationContext();
runSingleNodeElection(opCtx.get());
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1), Date_t::min() + Seconds(100));
OpTime opTimeToWait(Timestamp(100, 1), 1);
auto pseudoLogOp = stdx::async(stdx::launch::async, [this, &opTimeToWait]() {
// Not guaranteed to be scheduled after waitUntil blocks...
- replCoordSetMyLastAppliedOpTime(opTimeToWait);
- replCoordSetMyLastDurableOpTime(opTimeToWait);
+ replCoordSetMyLastAppliedOpTime(opTimeToWait, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(opTimeToWait, Date_t::min() + Seconds(100));
});
ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead(
@@ -4590,34 +4667,38 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
// lower configVersion
- StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 0) << "t" << 2)
- << "configVersion"
- << 1
- << "primaryIndex"
- << 2
- << "term"
- << 2
- << "syncSourceIndex"
- << 1)));
+ StatusWith<rpc::ReplSetMetadata> metadata = replReadFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastCommittedWall"
+ << Date_t::min() + Seconds(100)
+ << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 2)
+ << "configVersion"
+ << 1
+ << "primaryIndex"
+ << 2
+ << "term"
+ << 2
+ << "syncSourceIndex"
+ << 1)));
getReplCoord()->processReplSetMetadata(metadata.getValue());
ASSERT_EQUALS(0, getReplCoord()->getTerm());
// higher configVersion
- StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 0) << "t" << 2)
- << "configVersion"
- << 100
- << "primaryIndex"
- << 2
- << "term"
- << 2
- << "syncSourceIndex"
- << 1)));
+ StatusWith<rpc::ReplSetMetadata> metadata2 = replReadFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastCommittedWall"
+ << Date_t::min() + Seconds(100)
+ << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 2)
+ << "configVersion"
+ << 100
+ << "primaryIndex"
+ << 2
+ << "term"
+ << 2
+ << "syncSourceIndex"
+ << 1)));
getReplCoord()->processReplSetMetadata(metadata2.getValue());
ASSERT_EQUALS(0, getReplCoord()->getTerm());
}
@@ -4653,16 +4734,19 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeIsNewer
OpTime time(Timestamp(10, 1), 1);
OpTime oldTime(Timestamp(9, 1), 1);
- replCoordSetMyLastAppliedOpTime(time);
+ Date_t wallTime = Date_t::min() + Seconds(10);
+ replCoordSetMyLastAppliedOpTime(time, wallTime);
// higher OpTime, should change
- getReplCoord()->advanceCommitPoint(time, false);
+ getReplCoord()->advanceCommitPoint({time, wallTime}, false);
ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime());
+ ASSERT_EQUALS(wallTime, getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime);
ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime());
// lower OpTime, should not change
- getReplCoord()->advanceCommitPoint(oldTime, false);
+ getReplCoord()->advanceCommitPoint({oldTime, Date_t::min() + Seconds(5)}, false);
ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime());
+ ASSERT_EQUALS(wallTime, getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime);
ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime());
}
TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurrentPrimaryIndex) {
@@ -4694,54 +4778,60 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
ASSERT_EQUALS(1, getReplCoord()->getTerm());
// higher term, should change
- StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 0) << "t" << 3)
- << "configVersion"
- << 2
- << "primaryIndex"
- << 2
- << "term"
- << 3
- << "syncSourceIndex"
- << 1)));
+ StatusWith<rpc::ReplSetMetadata> metadata = replReadFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastCommittedWall"
+ << Date_t::min() + Seconds(100)
+ << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 3)
+ << "configVersion"
+ << 2
+ << "primaryIndex"
+ << 2
+ << "term"
+ << 3
+ << "syncSourceIndex"
+ << 1)));
getReplCoord()->processReplSetMetadata(metadata.getValue());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
// lower term, should not change
- StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
- << BSON("ts" << Timestamp(11, 0) << "t" << 3)
- << "configVersion"
- << 2
- << "primaryIndex"
- << 1
- << "term"
- << 2
- << "syncSourceIndex"
- << 1)));
+ StatusWith<rpc::ReplSetMetadata> metadata2 = replReadFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastCommittedWall"
+ << Date_t::min() + Seconds(100)
+ << "lastOpVisible"
+ << BSON("ts" << Timestamp(11, 0) << "t" << 3)
+ << "configVersion"
+ << 2
+ << "primaryIndex"
+ << 1
+ << "term"
+ << 2
+ << "syncSourceIndex"
+ << 1)));
getReplCoord()->processReplSetMetadata(metadata2.getValue());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
// same term, should not change
- StatusWith<rpc::ReplSetMetadata> metadata3 = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
- << BSON("ts" << Timestamp(11, 0) << "t" << 3)
- << "configVersion"
- << 2
- << "primaryIndex"
- << 1
- << "term"
- << 3
- << "syncSourceIndex"
- << 1)));
+ StatusWith<rpc::ReplSetMetadata> metadata3 = replReadFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastCommittedWall"
+ << Date_t::min() + Seconds(100)
+ << "lastOpVisible"
+ << BSON("ts" << Timestamp(11, 0) << "t" << 3)
+ << "configVersion"
+ << 2
+ << "primaryIndex"
+ << 1
+ << "term"
+ << 3
+ << "syncSourceIndex"
+ << 1)));
getReplCoord()->processReplSetMetadata(metadata3.getValue());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
@@ -4776,18 +4866,20 @@ TEST_F(ReplCoordTest,
auto config = replCoord->getConfig();
// Higher term - should update term but not last committed optime.
- StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 0) << "t" << 3)
- << "configVersion"
- << config.getConfigVersion()
- << "primaryIndex"
- << 1
- << "term"
- << 3
- << "syncSourceIndex"
- << 1)));
+ StatusWith<rpc::ReplSetMetadata> metadata = replReadFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastCommittedWall"
+ << Date_t::min() + Seconds(100)
+ << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 3)
+ << "configVersion"
+ << config.getConfigVersion()
+ << "primaryIndex"
+ << 1
+ << "term"
+ << 3
+ << "syncSourceIndex"
+ << 1)));
BSONObjBuilder responseBuilder;
ASSERT_OK(metadata.getValue().writeToMetadata(&responseBuilder));
@@ -4839,16 +4931,17 @@ TEST_F(ReplCoordTest, LastCommittedOpTimeOnlyUpdatedFromHeartbeatWhenLastApplied
auto opTime1 = OpTime({10, 1}, 1);
auto opTime2 = OpTime({11, 1}, 2); // In higher term.
auto commitPoint = OpTime({15, 1}, 2);
- replCoordSetMyLastAppliedOpTime(opTime1);
+ replCoordSetMyLastAppliedOpTime(opTime1, Date_t::min() + Seconds(100));
// Node 1 is the current primary. The commit point has a higher term than lastApplied.
- rpc::ReplSetMetadata metadata(2, // term
- commitPoint, // committedOpTime
- commitPoint, // visibleOpTime
- config.getConfigVersion(),
- {}, // replset id
- 1, // currentPrimaryIndex,
- 1); // currentSyncSourceIndex
+ rpc::ReplSetMetadata metadata(
+ 2, // term
+ {commitPoint, Date_t::min() + Seconds(commitPoint.getSecs())}, // committed OpTime
+ commitPoint, // visibleOpTime
+ config.getConfigVersion(),
+ {}, // replset id
+ 1, // currentPrimaryIndex,
+ 1); // currentSyncSourceIndex
auto net = getNet();
BSONObjBuilder responseBuilder;
@@ -4877,7 +4970,7 @@ TEST_F(ReplCoordTest, LastCommittedOpTimeOnlyUpdatedFromHeartbeatWhenLastApplied
}
// Update lastApplied, so commit point can be advanced.
- replCoordSetMyLastAppliedOpTime(opTime2);
+ replCoordSetMyLastAppliedOpTime(opTime2, Date_t::min() + Seconds(100));
{
net->enterNetwork();
net->runUntil(net->now() + config.getHeartbeatInterval());
@@ -4919,16 +5012,17 @@ TEST_F(ReplCoordTest, LastCommittedOpTimeOnlyUpdatedFromHeartbeatInFCV42) {
auto lastAppliedOpTime = OpTime({11, 1}, 2);
auto commitPoint = OpTime({15, 1}, 2);
- replCoordSetMyLastAppliedOpTime(lastAppliedOpTime);
+ replCoordSetMyLastAppliedOpTime(lastAppliedOpTime, Date_t::min() + Seconds(100));
// Node 1 is the current primary.
- rpc::ReplSetMetadata metadata(2, // term
- commitPoint, // committedOpTime
- commitPoint, // visibleOpTime
- config.getConfigVersion(),
- {}, // replset id
- 1, // currentPrimaryIndex,
- 1); // currentSyncSourceIndex
+ rpc::ReplSetMetadata metadata(
+ 2, // term
+ {commitPoint, Date_t::min() + Seconds(commitPoint.getSecs())}, // committed OpTime
+ commitPoint, // visibleOpTime
+ config.getConfigVersion(),
+ {}, // replset id
+ 1, // currentPrimaryIndex,
+ 1); // currentSyncSourceIndex
auto net = getNet();
BSONObjBuilder responseBuilder;
@@ -4993,15 +5087,15 @@ TEST_F(ReplCoordTest, AdvanceCommitPointFromSyncSourceCanSetCommitPointToLastApp
HostAndPort("node1", 12345));
ASSERT_EQUALS(OpTime(), getReplCoord()->getLastCommittedOpTime());
- auto lastApplied = OpTime({10, 1}, 1);
- auto commitPoint = OpTime({15, 1}, 2);
- replCoordSetMyLastAppliedOpTime(lastApplied);
+ OpTimeAndWallTime lastApplied = {OpTime({10, 1}, 1), Date_t::min() + Seconds(10)};
+ OpTimeAndWallTime commitPoint = {OpTime({15, 1}, 2), Date_t::min() + Seconds(15)};
+ replCoordSetMyLastAppliedOpTime(lastApplied.opTime, lastApplied.wallTime);
const bool fromSyncSource = true;
getReplCoord()->advanceCommitPoint(commitPoint, fromSyncSource);
// The commit point can be set to lastApplied, even though lastApplied is in a lower term.
- ASSERT_EQUALS(lastApplied, getReplCoord()->getLastCommittedOpTime());
+ ASSERT_EQUALS(lastApplied.opTime, getReplCoord()->getLastCommittedOpTime());
}
TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) {
@@ -5029,9 +5123,12 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) {
OpTime optime1{Timestamp(10, 0), 5};
OpTime optime2{Timestamp(11, 2), 5};
+ Date_t wallTime1 = Date_t::min() + Seconds(1);
+ Date_t wallTime2 = Date_t::min() + Seconds(2);
- replCoordSetMyLastAppliedOpTime(optime2);
- getReplCoord()->advanceCommitPoint(optime1, false);
+ replCoordSetMyLastAppliedOpTime(optime2, wallTime2);
+ // pass dummy Date_t to avoid advanceCommitPoint invariant
+ getReplCoord()->advanceCommitPoint({optime1, wallTime1}, false);
auto opCtx = makeOperationContext();
@@ -5044,17 +5141,19 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) {
BSONObj metadata = metadataBob.done();
log() << metadata;
- auto oqMetadata = rpc::OplogQueryMetadata::readFromMetadata(metadata);
+ auto oqMetadata = rpc::OplogQueryMetadata::readFromMetadata(metadata, /*requireWallTime*/ true);
ASSERT_OK(oqMetadata.getStatus());
- ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted(), optime1);
+ ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted().opTime, optime1);
+ ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted().wallTime, wallTime1);
ASSERT_EQ(oqMetadata.getValue().getLastOpApplied(), optime2);
ASSERT_EQ(oqMetadata.getValue().getRBID(), 100);
ASSERT_EQ(oqMetadata.getValue().getSyncSourceIndex(), -1);
ASSERT_EQ(oqMetadata.getValue().getPrimaryIndex(), -1);
- auto replMetadata = rpc::ReplSetMetadata::readFromMetadata(metadata);
+ auto replMetadata = replReadFromMetadata(metadata);
ASSERT_OK(replMetadata.getStatus());
- ASSERT_EQ(replMetadata.getValue().getLastOpCommitted(), optime1);
+ ASSERT_EQ(replMetadata.getValue().getLastOpCommitted().opTime, optime1);
+ ASSERT_EQ(replMetadata.getValue().getLastOpCommitted().wallTime, wallTime1);
ASSERT_EQ(replMetadata.getValue().getLastOpVisible(), OpTime());
ASSERT_EQ(replMetadata.getValue().getConfigVersion(), 2);
ASSERT_EQ(replMetadata.getValue().getTerm(), 0);
@@ -5092,18 +5191,20 @@ TEST_F(ReplCoordTest, TermAndLastCommittedOpTimeUpdatedFromHeartbeatWhenArbiter)
// Higher term - should update term and lastCommittedOpTime since arbiters learn of the
// commit point via heartbeats.
- StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 1) << "t" << 3) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 1) << "t" << 3)
- << "configVersion"
- << config.getConfigVersion()
- << "primaryIndex"
- << 1
- << "term"
- << 3
- << "syncSourceIndex"
- << 1)));
+ StatusWith<rpc::ReplSetMetadata> metadata = replReadFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(10, 1) << "t" << 3) << "lastCommittedWall"
+ << Date_t::min() + Seconds(100)
+ << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 1) << "t" << 3)
+ << "configVersion"
+ << config.getConfigVersion()
+ << "primaryIndex"
+ << 1
+ << "term"
+ << 3
+ << "syncSourceIndex"
+ << 1)));
BSONObjBuilder responseBuilder;
ASSERT_OK(metadata.getValue().writeToMetadata(&responseBuilder));
@@ -5299,8 +5400,10 @@ TEST_F(ReplCoordTest,
hbResp.setConfigVersion(3);
hbResp.setSetName("mySet");
hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ hbResp.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
+ hbResp.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
net->runReadyNetworkOperations();
net->exitNetwork();
@@ -5353,8 +5456,10 @@ TEST_F(ReplCoordTest,
hbResp.setSetName("mySet");
hbResp.setState(MemberState::RS_PRIMARY);
hbResp.setTerm(replCoord->getTerm());
- hbResp.setAppliedOpTime(OpTime(Timestamp(100, 1), 0));
- hbResp.setDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ hbResp.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
+ hbResp.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 1), 0), Date_t::min() + Seconds(100)});
hbResp.setConfigVersion(1);
// Heartbeat response is scheduled with a delay so that we can be sure that
@@ -5497,17 +5602,17 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW
OpTime time5(Timestamp(100, 5), 1);
OpTime time6(Timestamp(100, 6), 1);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastAppliedOpTime(time5);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastAppliedOpTime(time5, Date_t::min() + Seconds(100));
// ensure current snapshot follows price is right rules (closest but not greater than)
- replCoordSetMyLastDurableOpTime(time3);
+ replCoordSetMyLastDurableOpTime(time3, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime());
- replCoordSetMyLastDurableOpTime(time4);
+ replCoordSetMyLastDurableOpTime(time4, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime());
- replCoordSetMyLastDurableOpTime(time5);
+ replCoordSetMyLastDurableOpTime(time5, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshotOpTime());
}
@@ -5533,10 +5638,10 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) {
OpTime time5(Timestamp(100, 5), 1);
OpTime time6(Timestamp(100, 6), 1);
- replCoordSetMyLastAppliedOpTime(time1);
- replCoordSetMyLastAppliedOpTime(time2);
- replCoordSetMyLastAppliedOpTime(time5);
- replCoordSetMyLastDurableOpTime(time5);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
+ replCoordSetMyLastAppliedOpTime(time5, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time5, Date_t::min() + Seconds(100));
// ensure dropping all snapshots should reset the current committed snapshot
getReplCoord()->dropAllSnapshots();
@@ -5561,11 +5666,11 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAppliedOpTimeChanges) {
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
- replCoordSetMyLastAppliedOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime());
- replCoordSetMyLastAppliedOpTime(time2);
+ replCoordSetMyLastAppliedOpTime(time2, Date_t::min() + Seconds(100));
ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime());
- replCoordSetMyLastDurableOpTime(time2);
+ replCoordSetMyLastDurableOpTime(time2, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime());
}
@@ -5589,12 +5694,12 @@ TEST_F(ReplCoordTest,
OpTime time3(Timestamp(100, 3), term);
auto consistency = ReplicationCoordinator::DataConsistency::Consistent;
- replCoordSetMyLastAppliedOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime());
- replCoordSetMyLastAppliedOpTimeForward(time3, consistency);
+ replCoordSetMyLastAppliedOpTimeForward(time3, consistency, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time3, getReplCoord()->getMyLastAppliedOpTime());
- replCoordSetMyLastAppliedOpTimeForward(time2, consistency);
- replCoordSetMyLastDurableOpTimeForward(time2);
+ replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTimeForward(time2, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time3, getReplCoord()->getMyLastAppliedOpTime());
}
@@ -5617,11 +5722,11 @@ DEATH_TEST_F(ReplCoordTest,
OpTime time2(Timestamp(99, 1), 2);
auto consistency = ReplicationCoordinator::DataConsistency::Consistent;
- replCoordSetMyLastAppliedOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime());
// Since in pv1, oplog entries are ordered by non-decreasing
// term and strictly increasing timestamp, it leads to invariant failure.
- replCoordSetMyLastAppliedOpTimeForward(time2, consistency);
+ replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100));
}
DEATH_TEST_F(ReplCoordTest,
@@ -5643,11 +5748,11 @@ DEATH_TEST_F(ReplCoordTest,
OpTime time2(Timestamp(100, 1), 2);
auto consistency = ReplicationCoordinator::DataConsistency::Consistent;
- replCoordSetMyLastAppliedOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime());
// Since in pv1, oplog entries are ordered by non-decreasing
// term and strictly increasing timestamp, it leads to invariant failure.
- replCoordSetMyLastAppliedOpTimeForward(time2, consistency);
+ replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100));
}
DEATH_TEST_F(ReplCoordTest,
@@ -5669,11 +5774,11 @@ DEATH_TEST_F(ReplCoordTest,
OpTime time2(Timestamp(100, 2), 0);
auto consistency = ReplicationCoordinator::DataConsistency::Consistent;
- replCoordSetMyLastAppliedOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime());
// Since in pv1, oplog entries are ordered by non-decreasing
// term and strictly increasing timestamp, it leads to invariant failure.
- replCoordSetMyLastAppliedOpTimeForward(time2, consistency);
+ replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100));
}
DEATH_TEST_F(ReplCoordTest,
@@ -5695,11 +5800,11 @@ DEATH_TEST_F(ReplCoordTest,
OpTime time2(Timestamp(100, 1), 0);
auto consistency = ReplicationCoordinator::DataConsistency::Consistent;
- replCoordSetMyLastAppliedOpTime(time1);
+ replCoordSetMyLastAppliedOpTime(time1, Date_t::min() + Seconds(100));
ASSERT_EQUALS(time1, getReplCoord()->getMyLastAppliedOpTime());
// Since in pv1, oplog entries are ordered by non-decreasing
// term and strictly increasing timestamp, it leads to invariant failure.
- replCoordSetMyLastAppliedOpTimeForward(time2, consistency);
+ replCoordSetMyLastAppliedOpTimeForward(time2, consistency, Date_t::min() + Seconds(100));
}
TEST_F(ReplCoordTest, OnlyForwardSyncProgressForOtherNodesWhenTheNodesAreBelievedToBeUp) {
@@ -5721,8 +5826,8 @@ TEST_F(ReplCoordTest, OnlyForwardSyncProgressForOtherNodesWhenTheNodesAreBelieve
<< BSON("electionTimeoutMillis" << 2000 << "heartbeatIntervalMillis" << 40000)),
HostAndPort("test1", 1234));
OpTime optime(Timestamp(100, 2), 0);
- replCoordSetMyLastAppliedOpTime(optime);
- replCoordSetMyLastDurableOpTime(optime);
+ replCoordSetMyLastAppliedOpTime(optime, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime, Date_t::min() + Seconds(100));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime));
ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(1, 1, optime));
@@ -5795,21 +5900,30 @@ TEST_F(ReplCoordTest, UpdatePositionCmdHasMetadata) {
<< BSON("electionTimeoutMillis" << 2000 << "heartbeatIntervalMillis" << 40000)),
HostAndPort("test1", 1234));
OpTime optime(Timestamp(100, 2), 0);
- replCoordSetMyLastAppliedOpTime(optime);
- replCoordSetMyLastDurableOpTime(optime);
+ replCoordSetMyLastAppliedOpTime(optime, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(optime, Date_t::min() + Seconds(100));
auto opCtx = makeOperationContext();
- // Set last committed optime via metadata.
- rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1);
+ // Set last committed optime via metadata. Pass dummy Date_t to avoid advanceCommitPoint
+ // invariant.
+ rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(),
+ {optime, Date_t::min() + Seconds(optime.getSecs())},
+ optime,
+ 1,
+ OID(),
+ -1,
+ 1);
getReplCoord()->processReplSetMetadata(syncSourceMetadata);
- getReplCoord()->advanceCommitPoint(optime, true);
+ // Pass dummy Date_t to avoid advanceCommitPoint invariant.
+ getReplCoord()->advanceCommitPoint({optime, Date_t::min() + Seconds(optime.getSecs())}, true);
BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand());
- auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd));
+ auto metadata = unittest::assertGet(replReadFromMetadata(cmd));
ASSERT_EQUALS(metadata.getTerm(), getReplCoord()->getTerm());
ASSERT_EQUALS(metadata.getLastOpVisible(), optime);
- auto oqMetadataStatus = rpc::OplogQueryMetadata::readFromMetadata(cmd);
+ auto oqMetadataStatus =
+ rpc::OplogQueryMetadata::readFromMetadata(cmd, /*requireWallTime*/ true);
ASSERT_EQUALS(oqMetadataStatus.getStatus(), ErrorCodes::NoSuchKey);
}
@@ -5847,47 +5961,64 @@ TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNod
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
OpTime startingOpTime = OpTime(Timestamp(100, 1), 0);
- replCoordSetMyLastAppliedOpTime(startingOpTime);
- replCoordSetMyLastDurableOpTime(startingOpTime);
+ replCoordSetMyLastAppliedOpTime(startingOpTime, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(startingOpTime, Date_t::min() + Seconds(100));
// Receive notification that every node is up.
UpdatePositionArgs args;
- ASSERT_OK(
- args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
- << 1
- << UpdatePositionArgs::kUpdateArrayFieldName
- << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 1
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << startingOpTime.toBSON()
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << startingOpTime.toBSON())
- << BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 2
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << startingOpTime.toBSON()
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << startingOpTime.toBSON())
- << BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 3
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << startingOpTime.toBSON()
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << startingOpTime.toBSON())
- << BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 4
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << startingOpTime.toBSON()
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << startingOpTime.toBSON())))));
+ ASSERT_OK(updatePositionArgsInitialize(
+ args,
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 1
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs())
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs()))
+ << BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 2
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs())
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs()))
+ << BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 3
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs())
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs()))
+ << BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 4
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs())
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs()))))));
ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0));
// Become PRIMARY.
@@ -5895,26 +6026,36 @@ TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNod
// Keep two nodes alive via UpdatePosition.
UpdatePositionArgs args1;
- ASSERT_OK(
- args1.initialize(BSON(UpdatePositionArgs::kCommandFieldName
- << 1
- << UpdatePositionArgs::kUpdateArrayFieldName
- << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 1
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << startingOpTime.toBSON()
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << startingOpTime.toBSON())
- << BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 2
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << startingOpTime.toBSON()
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << startingOpTime.toBSON())))));
+ ASSERT_OK(updatePositionArgsInitialize(
+ args1,
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 1
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs())
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs()))
+ << BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 2
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs())
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs())))),
+ /*requireWallTime*/ true));
const Date_t startDate = getNet()->now();
getNet()->enterNetwork();
getNet()->runUntil(startDate + Milliseconds(100));
@@ -5952,18 +6093,23 @@ TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNod
// Keep one node alive via two methods (UpdatePosition and requestHeartbeat).
UpdatePositionArgs args2;
- ASSERT_OK(
- args2.initialize(BSON(UpdatePositionArgs::kCommandFieldName
- << 1
- << UpdatePositionArgs::kUpdateArrayFieldName
- << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
- << 2
- << UpdatePositionArgs::kMemberIdFieldName
- << 1
- << UpdatePositionArgs::kDurableOpTimeFieldName
- << startingOpTime.toBSON()
- << UpdatePositionArgs::kAppliedOpTimeFieldName
- << startingOpTime.toBSON())))));
+ ASSERT_OK(updatePositionArgsInitialize(
+ args2,
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << 2
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 1
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs())
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << startingOpTime.toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << Date_t::min() + Seconds(startingOpTime.getSecs()))))));
ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args2, 0));
hbArgs.setSetName("mySet");
@@ -6001,8 +6147,8 @@ TEST_F(ReplCoordTest, WaitForMemberState) {
HostAndPort("test1", 1234));
auto replCoord = getReplCoord();
auto initialTerm = replCoord->getTerm();
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(100));
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
// Single node cluster - this node should start election on setFollowerMode() completion.
@@ -6038,8 +6184,8 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
HostAndPort("test1", 1234));
auto replCoord = getReplCoord();
auto initialTerm = replCoord->getTerm();
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(100));
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
// Single node cluster - this node should start election on setFollowerMode() completion.
@@ -6184,8 +6330,8 @@ TEST_F(ReplCoordTest, NodeStoresElectionVotes) {
HostAndPort("node1", 12345));
auto time = OpTimeWithTermOne(100, 1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(time);
- replCoordSetMyLastDurableOpTime(time);
+ replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
auto opCtx = makeOperationContext();
@@ -6238,8 +6384,8 @@ TEST_F(ReplCoordTest, NodeDoesNotStoreDryRunVotes) {
HostAndPort("node1", 12345));
auto time = OpTimeWithTermOne(100, 1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(time);
- replCoordSetMyLastDurableOpTime(time);
+ replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
auto opCtx = makeOperationContext();
@@ -6290,8 +6436,8 @@ TEST_F(ReplCoordTest, NodeFailsVoteRequestIfItFailsToStoreLastVote) {
HostAndPort("node1", 12345));
auto time = OpTimeWithTermOne(100, 1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(time);
- replCoordSetMyLastDurableOpTime(time);
+ replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
// Get our current term, as primary.
@@ -6351,8 +6497,8 @@ TEST_F(ReplCoordTest, NodeNodesNotGrantVoteIfInTerminalShutdown) {
HostAndPort("node1", 12345));
auto time = OpTimeWithTermOne(100, 1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- replCoordSetMyLastAppliedOpTime(time);
- replCoordSetMyLastDurableOpTime(time);
+ replCoordSetMyLastAppliedOpTime(time, Date_t::min() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(time, Date_t::min() + Seconds(100));
simulateSuccessfulV1Election();
// Get our current term, as primary.
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 9f1ef793a58..79649d9b85f 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -325,8 +325,8 @@ void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result)
void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {}
-void ReplicationCoordinatorMock::advanceCommitPoint(const OpTime& committedOptime,
- bool fromSyncSource) {}
+void ReplicationCoordinatorMock::advanceCommitPoint(
+ const OpTimeAndWallTime& committedOptimeAndWallTime, bool fromSyncSource) {}
void ReplicationCoordinatorMock::cancelAndRescheduleElectionTimeout() {}
@@ -451,6 +451,10 @@ OpTime ReplicationCoordinatorMock::getLastCommittedOpTime() const {
return OpTime();
}
+OpTimeAndWallTime ReplicationCoordinatorMock::getLastCommittedOpTimeAndWallTime() const {
+ return {OpTime(), Date_t::min()};
+}
+
Status ReplicationCoordinatorMock::processReplSetRequestVotes(
OperationContext* opCtx,
const ReplSetRequestVotesArgs& args,
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 3389522810b..75052f1c8f7 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -189,7 +189,8 @@ public:
virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override;
- virtual void advanceCommitPoint(const OpTime& committedOptime, bool fromSyncSource) override;
+ virtual void advanceCommitPoint(const OpTimeAndWallTime& committedOptimeAndWallTime,
+ bool fromSyncSource) override;
virtual void cancelAndRescheduleElectionTimeout() override;
@@ -238,6 +239,8 @@ public:
virtual OpTime getLastCommittedOpTime() const;
+ virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const;
+
virtual std::vector<MemberData> getMemberData() const override;
virtual Status processReplSetRequestVotes(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index c788439d91d..f3d5cfb637f 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -231,8 +231,10 @@ void ReplCoordTest::simulateEnoughHeartbeatsForAllNodesUp() {
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
- hbResp.setAppliedOpTime(OpTime(Timestamp(100, 2), 0));
- hbResp.setDurableOpTime(OpTime(Timestamp(100, 2), 0));
+ hbResp.setAppliedOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 2), 0), Date_t::min() + Seconds(100)});
+ hbResp.setDurableOpTimeAndWallTime(
+ {OpTime(Timestamp(100, 2), 0), Date_t::min() + Seconds(100)});
BSONObjBuilder respObj;
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
} else {
@@ -334,8 +336,8 @@ void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t e
hbResp.setState(MemberState::RS_SECONDARY);
// The smallest valid optime in PV1.
OpTime opTime(Timestamp(), 0);
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
+ hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t::min() + Seconds(opTime.getSecs())});
+ hbResp.setDurableOpTimeAndWallTime({opTime, Date_t::min() + Seconds(opTime.getSecs())});
hbResp.setConfigVersion(rsConfig.getConfigVersion());
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
} else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") {
@@ -387,8 +389,8 @@ void ReplCoordTest::signalDrainComplete(OperationContext* opCtx) {
}
void ReplCoordTest::runSingleNodeElection(OperationContext* opCtx) {
- replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0));
- replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(1));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(1, 1), 0), Date_t::min() + Seconds(1));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->waitForElectionFinish_forTest();
@@ -428,8 +430,10 @@ bool ReplCoordTest::consumeHeartbeatV1(const NetworkInterfaceMock::NetworkOperat
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
- hbResp.setAppliedOpTime(lastApplied);
- hbResp.setDurableOpTime(lastApplied);
+ hbResp.setAppliedOpTimeAndWallTime(
+ {lastApplied, Date_t::min() + Seconds(lastApplied.getSecs())});
+ hbResp.setDurableOpTimeAndWallTime(
+ {lastApplied, Date_t::min() + Seconds(lastApplied.getSecs())});
BSONObjBuilder respObj;
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
return true;
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index db1406e1227..ab831e8820a 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -52,6 +52,7 @@ class ReplicationCoordinatorExternalStateMock;
class ReplicationCoordinatorImpl;
class StorageInterfaceMock;
class TopologyCoordinator;
+class UpdatePositionArgs;
using executor::NetworkInterfaceMock;
@@ -105,25 +106,57 @@ protected:
return _repl.get();
}
+ Status updatePositionArgsInitialize(UpdatePositionArgs& args,
+ const BSONObj& argsObj,
+ bool requireWallTime = true) {
+ return args.initialize(argsObj, requireWallTime);
+ }
+
+ StatusWith<rpc::ReplSetMetadata> replReadFromMetadata(const BSONObj& doc,
+ bool requireWallTime = true) {
+ return rpc::ReplSetMetadata::readFromMetadata(doc, requireWallTime);
+ }
+
void replCoordSetMyLastAppliedOpTime(const OpTime& opTime, Date_t wallTime = Date_t::min()) {
+ if (wallTime == Date_t::min()) {
+ wallTime = wallTime + Seconds(opTime.getSecs());
+ }
getReplCoord()->setMyLastAppliedOpTimeAndWallTime({opTime, wallTime});
}
void replCoordSetMyLastAppliedOpTimeForward(const OpTime& opTime,
ReplicationCoordinator::DataConsistency consistency,
Date_t wallTime = Date_t::min()) {
+ if (wallTime == Date_t::min()) {
+ wallTime = wallTime + Seconds(opTime.getSecs());
+ }
getReplCoord()->setMyLastAppliedOpTimeAndWallTimeForward({opTime, wallTime}, consistency);
}
void replCoordSetMyLastDurableOpTime(const OpTime& opTime, Date_t wallTime = Date_t::min()) {
+ if (wallTime == Date_t::min()) {
+ wallTime = wallTime + Seconds(opTime.getSecs());
+ }
getReplCoord()->setMyLastDurableOpTimeAndWallTime({opTime, wallTime});
}
void replCoordSetMyLastDurableOpTimeForward(const OpTime& opTime,
Date_t wallTime = Date_t::min()) {
+ if (wallTime == Date_t::min()) {
+ wallTime = wallTime + Seconds(opTime.getSecs());
+ }
getReplCoord()->setMyLastDurableOpTimeAndWallTimeForward({opTime, wallTime});
}
+ void replCoordAdvanceCommitPoint(const OpTime& opTime,
+ Date_t wallTime = Date_t::min(),
+ bool fromSyncSource = false) {
+ if (wallTime == Date_t::min()) {
+ wallTime = wallTime + Seconds(opTime.getSecs());
+ }
+ getReplCoord()->advanceCommitPoint({opTime, wallTime}, fromSyncSource);
+ }
+
/**
* Gets the storage interface.
*/
diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp
index 5e7c852d211..6ad2390d3c0 100644
--- a/src/mongo/db/repl/reporter.cpp
+++ b/src/mongo/db/repl/reporter.cpp
@@ -54,7 +54,7 @@ const char kConfigVersionFieldName[] = "configVersion";
template <typename UpdatePositionArgsType>
long long _parseCommandRequestConfigVersion(const BSONObj& commandRequest) {
UpdatePositionArgsType args;
- if (!args.initialize(commandRequest).isOK()) {
+ if (!args.initialize(commandRequest, /*requireWallTime*/ false).isOK()) {
return -1;
}
if (args.updatesBegin() == args.updatesEnd()) {
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index fab0a0821b7..0c28ff717fb 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -72,8 +72,12 @@ public:
BSONObjBuilder entry(arrayBuilder.subobjStart());
itr.second.lastDurableOpTime.append(&entry,
UpdatePositionArgs::kDurableOpTimeFieldName);
+ entry.appendDate(UpdatePositionArgs::kDurableWallTimeFieldName,
+ Date_t::min() + Seconds(itr.second.lastDurableOpTime.getSecs()));
itr.second.lastAppliedOpTime.append(&entry,
UpdatePositionArgs::kAppliedOpTimeFieldName);
+ entry.appendDate(UpdatePositionArgs::kAppliedWallTimeFieldName,
+ Date_t::min() + Seconds(itr.second.lastAppliedOpTime.getSecs()));
entry.append(UpdatePositionArgs::kMemberIdFieldName, itr.first);
if (_configVersion != -1) {
entry.append(UpdatePositionArgs::kConfigVersionFieldName, _configVersion);
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 4b682455e06..31e0b8ea1a3 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -589,10 +589,10 @@ Status TopologyCoordinator::prepareHeartbeatResponseV1(Date_t now,
response->setElectionTime(_electionTime);
}
- const OpTime lastOpApplied = getMyLastAppliedOpTime();
- const OpTime lastOpDurable = getMyLastDurableOpTime();
- response->setAppliedOpTime(lastOpApplied);
- response->setDurableOpTime(lastOpDurable);
+ const OpTimeAndWallTime lastOpApplied = getMyLastAppliedOpTimeAndWallTime();
+ const OpTimeAndWallTime lastOpDurable = getMyLastDurableOpTimeAndWallTime();
+ response->setAppliedOpTimeAndWallTime(lastOpApplied);
+ response->setDurableOpTimeAndWallTime(lastOpDurable);
if (_currentPrimaryIndex != -1) {
response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId());
@@ -977,6 +977,7 @@ void TopologyCoordinator::setMyLastAppliedOpTimeAndWallTime(OpTimeAndWallTime op
myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm ||
opTime.getTimestamp() > myLastAppliedOpTime.getTimestamp());
}
+
myMemberData.setLastAppliedOpTimeAndWallTime(opTimeAndWallTime, now);
}
@@ -1053,10 +1054,11 @@ StatusWith<bool> TopologyCoordinator::setLastOptime(const UpdatePositionArgs::Up
<< memberData->getLastDurableOpTime() << "; updating to optime " << args.appliedOpTime
<< " and durable through " << args.durableOpTime;
-
- bool advancedOpTime = memberData->advanceLastAppliedOpTime(args.appliedOpTime, now);
- advancedOpTime =
- memberData->advanceLastDurableOpTime(args.durableOpTime, now) || advancedOpTime;
+ bool advancedOpTime = memberData->advanceLastAppliedOpTimeAndWallTime(
+ {args.appliedOpTime, args.appliedWallTime}, now);
+ advancedOpTime = memberData->advanceLastDurableOpTimeAndWallTime(
+ {args.durableOpTime, args.durableWallTime}, now) ||
+ advancedOpTime;
return advancedOpTime;
}
@@ -1383,7 +1385,9 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex,
ReplSetHeartbeatResponse hbResponse;
hbResponse.setState(MemberState::RS_PRIMARY);
hbResponse.setElectionTime(electionTime);
- hbResponse.setAppliedOpTime(_memberData.at(primaryIndex).getHeartbeatAppliedOpTime());
+ hbResponse.setAppliedOpTimeAndWallTime(
+ {_memberData.at(primaryIndex).getHeartbeatAppliedOpTime(),
+ Date_t::min() + Seconds(1)});
hbResponse.setSyncingTo(HostAndPort());
_memberData.at(primaryIndex)
.setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(),
@@ -1592,12 +1596,16 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu
// New optimes, to hold them all.
BSONObjBuilder optimes;
- _lastCommittedOpTime.append(&optimes, "lastCommittedOpTime");
+ _lastCommittedOpTimeAndWallTime.opTime.append(&optimes, "lastCommittedOpTime");
+
+ if (_lastCommittedOpTimeAndWallTime.wallTime.isFormattable()) {
+ optimes.appendDate("lastCommittedWallTime", _lastCommittedOpTimeAndWallTime.wallTime);
+ }
+
if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) {
rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime");
}
-
appendOpTime(&optimes, "appliedOpTime", lastOpApplied);
appendOpTime(&optimes, "durableOpTime", lastOpDurable);
@@ -1653,8 +1661,12 @@ StatusWith<BSONObj> TopologyCoordinator::prepareReplSetUpdatePositionCommand(
BSONObjBuilder entry(arrayBuilder.subobjStart());
memberData.getLastDurableOpTime().append(&entry,
UpdatePositionArgs::kDurableOpTimeFieldName);
+ entry.appendDate(UpdatePositionArgs::kDurableWallTimeFieldName,
+ memberData.getLastDurableWallTime());
memberData.getLastAppliedOpTime().append(&entry,
UpdatePositionArgs::kAppliedOpTimeFieldName);
+ entry.appendDate(UpdatePositionArgs::kAppliedWallTimeFieldName,
+ memberData.getLastAppliedWallTime());
entry.append(UpdatePositionArgs::kMemberIdFieldName, memberData.getMemberId());
entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
}
@@ -2381,7 +2393,7 @@ void TopologyCoordinator::_stepDownSelfAndReplaceWith(int newPrimary) {
_setLeaderMode(LeaderMode::kNotLeader);
}
-bool TopologyCoordinator::updateLastCommittedOpTime() {
+bool TopologyCoordinator::updateLastCommittedOpTimeAndWallTime() {
// If we're not primary or we're stepping down due to learning of a new term then we must not
// advance the commit point. If we are stepping down due to a user request, however, then it
// is safe to advance the commit point, and in fact we must since the stepdown request may be
@@ -2393,33 +2405,39 @@ bool TopologyCoordinator::updateLastCommittedOpTime() {
// Whether we use the applied or durable OpTime for the commit point is decided here.
const bool useDurableOpTime = _rsConfig.getWriteConcernMajorityShouldJournal();
- std::vector<OpTime> votingNodesOpTimes;
+ std::vector<OpTimeAndWallTime> votingNodesOpTimesAndWallTimes;
for (const auto& memberData : _memberData) {
int memberIndex = memberData.getConfigIndex();
invariant(memberIndex >= 0);
const auto& memberConfig = _rsConfig.getMemberAt(memberIndex);
if (memberConfig.isVoter()) {
- const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime()
- : memberData.getLastAppliedOpTime();
- votingNodesOpTimes.push_back(opTime);
+ const OpTimeAndWallTime durableOpTime = {memberData.getLastDurableOpTime(),
+ memberData.getLastDurableWallTime()};
+ const OpTimeAndWallTime appliedOpTime = {memberData.getLastAppliedOpTime(),
+ memberData.getLastAppliedWallTime()};
+ const OpTimeAndWallTime opTime = useDurableOpTime ? durableOpTime : appliedOpTime;
+ votingNodesOpTimesAndWallTimes.push_back(opTime);
}
}
- invariant(votingNodesOpTimes.size() > 0);
- if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) {
+ invariant(votingNodesOpTimesAndWallTimes.size() > 0);
+ if (votingNodesOpTimesAndWallTimes.size() <
+ static_cast<unsigned long>(_rsConfig.getWriteMajority())) {
return false;
}
- std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
+ std::sort(votingNodesOpTimesAndWallTimes.begin(), votingNodesOpTimesAndWallTimes.end());
// need the majority to have this OpTime
- OpTime committedOpTime =
- votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
+ OpTimeAndWallTime committedOpTime =
+ votingNodesOpTimesAndWallTimes[votingNodesOpTimesAndWallTimes.size() -
+ _rsConfig.getWriteMajority()];
const bool fromSyncSource = false;
- return advanceLastCommittedOpTime(committedOpTime, fromSyncSource);
+ return advanceLastCommittedOpTimeAndWallTime(committedOpTime, fromSyncSource);
}
-bool TopologyCoordinator::advanceLastCommittedOpTime(OpTime committedOpTime, bool fromSyncSource) {
+bool TopologyCoordinator::advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime committedOpTime,
+ bool fromSyncSource) {
if (_selfIndex == -1) {
// The config hasn't been installed or we are not in the config. This could happen
// on heartbeats before installing a config.
@@ -2427,43 +2445,48 @@ bool TopologyCoordinator::advanceLastCommittedOpTime(OpTime committedOpTime, boo
}
// This check is performed to ensure primaries do not commit an OpTime from a previous term.
- if (_iAmPrimary() && committedOpTime < _firstOpTimeOfMyTerm) {
+ if (_iAmPrimary() && committedOpTime.opTime < _firstOpTimeOfMyTerm) {
LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: "
- << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm;
+ << committedOpTime.opTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm;
return false;
}
// Arbiters don't have data so they always advance their commit point via heartbeats.
if (!_selfConfig().isArbiter() &&
- getMyLastAppliedOpTime().getTerm() != committedOpTime.getTerm()) {
+ getMyLastAppliedOpTime().getTerm() != committedOpTime.opTime.getTerm()) {
if (fromSyncSource) {
- committedOpTime = std::min(committedOpTime, getMyLastAppliedOpTime());
+ committedOpTime = std::min(committedOpTime, getMyLastAppliedOpTimeAndWallTime());
} else {
LOG(1) << "Ignoring commit point with different term than my lastApplied, since it "
"may "
"not be on the same oplog branch as mine. optime: "
- << committedOpTime << ", my last applied: " << getMyLastAppliedOpTime();
+ << committedOpTime
+ << ", my last applied: " << getMyLastAppliedOpTimeAndWallTime();
return false;
}
}
- if (committedOpTime == _lastCommittedOpTime) {
+ if (committedOpTime.opTime == _lastCommittedOpTimeAndWallTime.opTime) {
return false; // Hasn't changed, so ignore it.
}
- if (committedOpTime < _lastCommittedOpTime) {
+ if (committedOpTime.opTime < _lastCommittedOpTimeAndWallTime.opTime) {
LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime
- << ", currentCommittedOpTime: " << _lastCommittedOpTime;
+ << ", currentCommittedOpTime: " << _lastCommittedOpTimeAndWallTime;
return false;
}
- LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime;
- _lastCommittedOpTime = committedOpTime;
+ LOG(2) << "Updating _lastCommittedOpTimeAndWallTime to " << committedOpTime;
+ _lastCommittedOpTimeAndWallTime = committedOpTime;
return true;
}
OpTime TopologyCoordinator::getLastCommittedOpTime() const {
- return _lastCommittedOpTime;
+ return _lastCommittedOpTimeAndWallTime.opTime;
+}
+
+OpTimeAndWallTime TopologyCoordinator::getLastCommittedOpTimeAndWallTime() const {
+ return _lastCommittedOpTimeAndWallTime;
}
bool TopologyCoordinator::canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const {
@@ -2652,7 +2675,7 @@ bool TopologyCoordinator::shouldChangeSyncSource(
rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata(
const OpTime& lastVisibleOpTime) const {
return rpc::ReplSetMetadata(_term,
- _lastCommittedOpTime,
+ _lastCommittedOpTimeAndWallTime,
lastVisibleOpTime,
_rsConfig.getConfigVersion(),
_rsConfig.getReplicaSetId(),
@@ -2661,7 +2684,7 @@ rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata(
}
rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid) const {
- return rpc::OplogQueryMetadata(_lastCommittedOpTime,
+ return rpc::OplogQueryMetadata(_lastCommittedOpTimeAndWallTime,
getMyLastAppliedOpTime(),
rbid,
_currentPrimaryIndex,
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 9b96f613053..b462c9a3251 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -230,7 +230,7 @@ public:
* the config getWriteConcernMajorityShouldJournal is set.
* Returns true if the _lastCommittedOpTime was changed.
*/
- bool updateLastCommittedOpTime();
+ bool updateLastCommittedOpTimeAndWallTime();
/**
* Updates _lastCommittedOpTime to be 'committedOpTime' if it is more recent than the current
@@ -239,13 +239,16 @@ public:
* 'fromSyncSource'=true, which guarantees we are on the same branch of history as
* 'committedOpTime', so we update our commit point to min(committedOpTime, lastApplied).
*/
- bool advanceLastCommittedOpTime(OpTime committedOpTime, bool fromSyncSource);
+ bool advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime committedOpTimeAndWallTime,
+ bool fromSyncSource);
/**
* Returns the OpTime of the latest majority-committed op known to this server.
*/
OpTime getLastCommittedOpTime() const;
+ OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const;
+
/**
* Returns true if it's safe to transition to LeaderMode::kMaster.
*/
@@ -935,7 +938,7 @@ private:
Date_t _electionSleepUntil;
// OpTime of the latest committed operation.
- OpTime _lastCommittedOpTime;
+ OpTimeAndWallTime _lastCommittedOpTimeAndWallTime;
// OpTime representing our transition to PRIMARY and the start of our term.
// _lastCommittedOpTime cannot be set to an earlier OpTime.
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index f7b9d212ecf..83ecf28486a 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -116,14 +116,20 @@ protected:
ASSERT_OK(getTopoCoord().completeTransitionToPrimary(dummyOpTime));
}
- void setMyOpTime(const OpTime& opTime, const Date_t wallTime = Date_t::min()) {
+ void setMyOpTime(const OpTime& opTime, Date_t wallTime = Date_t::min()) {
+ if (wallTime == Date_t::min()) {
+ wallTime = wallTime + Seconds(opTime.getSecs());
+ }
getTopoCoord().setMyLastAppliedOpTimeAndWallTime({opTime, wallTime}, now(), false);
}
void topoCoordSetMyLastAppliedOpTime(const OpTime& opTime,
Date_t now,
bool isRollbackAllowed,
- const Date_t wallTime = Date_t::min()) {
+ Date_t wallTime = Date_t::min()) {
+ if (wallTime == Date_t::min()) {
+ wallTime = wallTime + Seconds(opTime.getSecs());
+ }
getTopoCoord().setMyLastAppliedOpTimeAndWallTime(
{opTime, wallTime}, now, isRollbackAllowed);
}
@@ -131,11 +137,23 @@ protected:
void topoCoordSetMyLastDurableOpTime(const OpTime& opTime,
Date_t now,
bool isRollbackAllowed,
- const Date_t wallTime = Date_t::min()) {
+ Date_t wallTime = Date_t::min()) {
+ if (wallTime == Date_t::min()) {
+ wallTime = wallTime + Seconds(opTime.getSecs());
+ }
getTopoCoord().setMyLastDurableOpTimeAndWallTime(
{opTime, wallTime}, now, isRollbackAllowed);
}
+ void topoCoordAdvanceLastCommittedOpTime(const OpTime& opTime,
+ Date_t wallTime = Date_t::min(),
+ const bool fromSyncSource = false) {
+ if (wallTime == Date_t::min()) {
+ wallTime = wallTime + Seconds(opTime.getSecs());
+ }
+ getTopoCoord().advanceLastCommittedOpTimeAndWallTime({opTime, wallTime}, fromSyncSource);
+ }
+
void setSelfMemberState(const MemberState& newState) {
getTopoCoord().changeMemberState_forTest(newState);
}
@@ -189,7 +207,7 @@ protected:
int primaryIndex = -1,
int syncSourceIndex = -1) {
return ReplSetMetadata(_topo->getTerm(),
- OpTime(),
+ OpTimeAndWallTime(),
visibleOpTime,
_currentConfig.getConfigVersion(),
OID(),
@@ -201,8 +219,10 @@ protected:
// Only set lastAppliedOpTime, primaryIndex and syncSourceIndex
OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(),
int primaryIndex = -1,
- int syncSourceIndex = -1) {
- return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, primaryIndex, syncSourceIndex);
+ int syncSourceIndex = -1,
+ Date_t lastCommittedWall = Date_t::min()) {
+ return OplogQueryMetadata(
+ {OpTime(), lastCommittedWall}, lastAppliedOpTime, -1, primaryIndex, syncSourceIndex);
}
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
@@ -261,12 +281,20 @@ private:
Timestamp electionTime,
const OpTime& lastOpTimeSender,
Milliseconds roundTripTime,
- const HostAndPort& syncingTo) {
+ const HostAndPort& syncingTo,
+ Date_t lastDurableWallTime = Date_t::min(),
+ Date_t lastAppliedWallTime = Date_t::min()) {
+ if (lastDurableWallTime == Date_t::min()) {
+ lastDurableWallTime = lastDurableWallTime + Seconds(lastOpTimeSender.getSecs());
+ }
+ if (lastAppliedWallTime == Date_t::min()) {
+ lastAppliedWallTime = lastAppliedWallTime + Seconds(lastOpTimeSender.getSecs());
+ }
ReplSetHeartbeatResponse hb;
hb.setConfigVersion(1);
hb.setState(memberState);
- hb.setDurableOpTime(lastOpTimeSender);
- hb.setAppliedOpTime(lastOpTimeSender);
+ hb.setDurableOpTimeAndWallTime({lastOpTimeSender, lastDurableWallTime});
+ hb.setAppliedOpTimeAndWallTime({lastOpTimeSender, lastAppliedWallTime});
hb.setElectionTime(electionTime);
hb.setTerm(getTopoCoord().getTerm());
hb.setSyncingTo(syncingTo);
@@ -1599,8 +1627,12 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
Date_t curTime = heartbeatTime + uptimeSecs;
Timestamp electionTime(1, 2);
OpTime oplogProgress(Timestamp(3, 1), 20);
+ Date_t appliedWallTime = Date_t() + Seconds(oplogProgress.getSecs());
OpTime oplogDurable(Timestamp(1, 1), 19);
+ Date_t durableWallTime = Date_t() + Seconds(oplogDurable.getSecs());
+ ;
OpTime lastCommittedOpTime(Timestamp(5, 1), 20);
+ Date_t lastCommittedWallTime = Date_t() + Seconds(lastCommittedOpTime.getSecs());
OpTime readConcernMajorityOpTime(Timestamp(4, 1), 20);
Timestamp lastStableRecoveryTimestamp(2, 2);
Timestamp lastStableCheckpointTimestampDeprecated(2, 2);
@@ -1611,8 +1643,8 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
hb.setConfigVersion(1);
hb.setState(MemberState::RS_SECONDARY);
hb.setElectionTime(electionTime);
- hb.setDurableOpTime(oplogDurable);
- hb.setAppliedOpTime(oplogProgress);
+ hb.setDurableOpTimeAndWallTime({oplogDurable, durableWallTime});
+ hb.setAppliedOpTimeAndWallTime({oplogProgress, appliedWallTime});
StatusWith<ReplSetHeartbeatResponse> hbResponseGood = StatusWith<ReplSetHeartbeatResponse>(hb);
updateConfig(BSON("_id" << setName << "version" << 1 << "members"
@@ -1647,9 +1679,9 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
getTopoCoord().processHeartbeatResponse(
heartbeatTime, Milliseconds(4000), member, hbResponseGood);
makeSelfPrimary(electionTime);
- topoCoordSetMyLastAppliedOpTime(oplogProgress, startupTime, false);
- topoCoordSetMyLastDurableOpTime(oplogDurable, startupTime, false);
- getTopoCoord().advanceLastCommittedOpTime(lastCommittedOpTime, false);
+ topoCoordSetMyLastAppliedOpTime(oplogProgress, startupTime, false, appliedWallTime);
+ topoCoordSetMyLastDurableOpTime(oplogDurable, startupTime, false, durableWallTime);
+ topoCoordAdvanceLastCommittedOpTime(lastCommittedOpTime, lastCommittedWallTime, false);
// Now node 0 is down, node 1 is up, and for node 2 we have no heartbeat data yet.
BSONObjBuilder statusBuilder;
@@ -1681,8 +1713,11 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
ASSERT_BSONOBJ_EQ(readConcernMajorityOpTime.toBSON(),
optimes["readConcernMajorityOpTime"].Obj());
ASSERT_BSONOBJ_EQ(oplogProgress.toBSON(), optimes["appliedOpTime"].Obj());
+ ASSERT_EQUALS(appliedWallTime, optimes["lastAppliedWallTime"].Date());
ASSERT_BSONOBJ_EQ((oplogDurable).toBSON(), optimes["durableOpTime"].Obj());
+ ASSERT_EQUALS(durableWallTime, optimes["lastDurableWallTime"].Date());
ASSERT_BSONOBJ_EQ(lastCommittedOpTime.toBSON(), optimes["lastCommittedOpTime"].Obj());
+ ASSERT_EQUALS(lastCommittedWallTime, optimes["lastCommittedWallTime"].Date());
}
std::vector<BSONElement> memberArray = rsStatus["members"].Array();
ASSERT_EQUALS(4U, memberArray.size());
@@ -3626,7 +3661,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeart
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
// "host4" since "host4" is absent from the config of version 10.
- ReplSetMetadata replMetadata(0, OpTime(), OpTime(), 10, OID(), -1, -1);
+ ReplSetMetadata replMetadata(0, {OpTime(), Date_t::min()}, OpTime(), 10, OID(), -1, -1);
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host4"), replMetadata, makeOplogQueryMetadata(), now()));
}
@@ -3665,9 +3700,12 @@ TEST_F(HeartbeatResponseTestV1, ReconfigNodeRemovedBetweenHeartbeatRequestAndRep
0);
ReplSetHeartbeatResponse hb;
- hb.initialize(BSON("ok" << 1 << "v" << 1 << "state" << MemberState::RS_PRIMARY), 0)
+ hb.initialize(BSON("ok" << 1 << "v" << 1 << "state" << MemberState::RS_PRIMARY),
+ 0,
+ /*requireWallTime*/ true)
.transitional_ignore();
- hb.setDurableOpTime(lastOpTimeApplied);
+ hb.setDurableOpTimeAndWallTime(
+ {lastOpTimeApplied, Date_t::min() + Seconds(lastOpTimeApplied.getSecs())});
hb.setElectionTime(election.getTimestamp());
StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb);
HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse(
@@ -3713,15 +3751,21 @@ TEST_F(HeartbeatResponseTestV1, ReconfigBetweenHeartbeatRequestAndRepsonse) {
ReplSetHeartbeatResponse hb;
hb.initialize(BSON("ok" << 1 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON()
+ << "durableWallTime"
+ << Date_t::min() + Seconds(100)
<< "opTime"
<< OpTime(Timestamp(100, 0), 0).toBSON()
+ << "wallTime"
+ << Date_t::min() + Seconds(100)
<< "v"
<< 1
<< "state"
<< MemberState::RS_PRIMARY),
- 0)
+ 0,
+ /*requireWallTime*/ true)
.transitional_ignore();
- hb.setDurableOpTime(lastOpTimeApplied);
+ hb.setDurableOpTimeAndWallTime(
+ {lastOpTimeApplied, Date_t::min() + Seconds(lastOpTimeApplied.getSecs())});
hb.setElectionTime(election.getTimestamp());
StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb);
topoCoordSetMyLastAppliedOpTime(lastOpTimeApplied, Date_t(), false);
@@ -3882,14 +3926,16 @@ TEST_F(TopoCoordTest, FreshestNodeDoesCatchupTakeover) {
setSelfMemberState(MemberState::RS_SECONDARY);
OpTime currentOptime(Timestamp(200, 1), 0);
+ Date_t currentWallTime = Date_t::min() + Seconds(currentOptime.getSecs());
OpTime behindOptime(Timestamp(100, 1), 0);
+ Date_t behindWallTime = Date_t::min() + Seconds(behindOptime.getSecs());
// Create a mock heartbeat response to be able to compare who is the freshest node.
// The latest heartbeat responses are looked at for determining the latest optime
// and therefore freshness for catchup takeover.
ReplSetHeartbeatResponse hbResp = ReplSetHeartbeatResponse();
hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setAppliedOpTime(currentOptime);
+ hbResp.setAppliedOpTimeAndWallTime({currentOptime, currentWallTime});
hbResp.setTerm(1);
Date_t firstRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T13:00Z"));
@@ -3903,7 +3949,7 @@ TEST_F(TopoCoordTest, FreshestNodeDoesCatchupTakeover) {
Milliseconds(999),
HostAndPort("host3:27017"),
StatusWith<ReplSetHeartbeatResponse>(hbResp));
- hbResp.setAppliedOpTime(behindOptime);
+ hbResp.setAppliedOpTimeAndWallTime({behindOptime, behindWallTime});
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
@@ -3937,12 +3983,14 @@ TEST_F(TopoCoordTest, StaleNodeDoesntDoCatchupTakeover) {
setSelfMemberState(MemberState::RS_SECONDARY);
OpTime currentOptime(Timestamp(200, 1), 0);
+ Date_t currentWallTime = Date_t::min() + Seconds(currentOptime.getSecs());
OpTime behindOptime(Timestamp(100, 1), 0);
+ Date_t behindWallTime = Date_t::min() + Seconds(behindOptime.getSecs());
// Create a mock heartbeat response to be able to compare who is the freshest node.
ReplSetHeartbeatResponse hbResp = ReplSetHeartbeatResponse();
hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setAppliedOpTime(currentOptime);
+ hbResp.setAppliedOpTimeAndWallTime({currentOptime, currentWallTime});
hbResp.setTerm(1);
Date_t firstRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T13:00Z"));
@@ -3956,7 +4004,7 @@ TEST_F(TopoCoordTest, StaleNodeDoesntDoCatchupTakeover) {
Milliseconds(999),
HostAndPort("host3:27017"),
StatusWith<ReplSetHeartbeatResponse>(hbResp));
- hbResp.setAppliedOpTime(behindOptime);
+ hbResp.setAppliedOpTimeAndWallTime({behindOptime, behindWallTime});
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
@@ -3994,11 +4042,12 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverHeartbeatSaysPrimaryCaughtUp) {
setSelfMemberState(MemberState::RS_SECONDARY);
OpTime currentOptime(Timestamp(200, 1), 0);
+ Date_t currentWallTime = Date_t::min() + Seconds(currentOptime.getSecs());
// Create a mock heartbeat response to be able to compare who is the freshest node.
ReplSetHeartbeatResponse hbResp = ReplSetHeartbeatResponse();
hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setAppliedOpTime(currentOptime);
+ hbResp.setAppliedOpTimeAndWallTime({currentOptime, currentWallTime});
hbResp.setTerm(1);
Date_t firstRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T13:00Z"));
@@ -4050,11 +4099,13 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverIfTermNumbersSayPrimaryCaughtUp
OpTime currentOptime(Timestamp(200, 1), 1);
OpTime behindOptime(Timestamp(100, 1), 0);
+ Date_t currentWallTime = Date_t::min() + Seconds(currentOptime.getSecs());
+ Date_t behindWallTime = Date_t::min() + Seconds(behindOptime.getSecs());
// Create a mock heartbeat response to be able to compare who is the freshest node.
ReplSetHeartbeatResponse hbResp = ReplSetHeartbeatResponse();
hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setAppliedOpTime(currentOptime);
+ hbResp.setAppliedOpTimeAndWallTime({currentOptime, currentWallTime});
hbResp.setTerm(1);
Date_t firstRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T13:00Z"));
@@ -4070,7 +4121,7 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverIfTermNumbersSayPrimaryCaughtUp
Milliseconds(999),
HostAndPort("host3:27017"),
StatusWith<ReplSetHeartbeatResponse>(hbResp));
- hbResp.setAppliedOpTime(behindOptime);
+ hbResp.setAppliedOpTimeAndWallTime({behindOptime, behindWallTime});
hbResp.setState(MemberState::RS_PRIMARY);
getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000),
Milliseconds(999),
diff --git a/src/mongo/db/repl/update_position_args.cpp b/src/mongo/db/repl/update_position_args.cpp
index 1e164187844..70dad16f1ca 100644
--- a/src/mongo/db/repl/update_position_args.cpp
+++ b/src/mongo/db/repl/update_position_args.cpp
@@ -43,17 +43,26 @@ namespace repl {
const char UpdatePositionArgs::kCommandFieldName[] = "replSetUpdatePosition";
const char UpdatePositionArgs::kUpdateArrayFieldName[] = "optimes";
const char UpdatePositionArgs::kAppliedOpTimeFieldName[] = "appliedOpTime";
+const char UpdatePositionArgs::kAppliedWallTimeFieldName[] = "appliedWallTime";
const char UpdatePositionArgs::kDurableOpTimeFieldName[] = "durableOpTime";
+const char UpdatePositionArgs::kDurableWallTimeFieldName[] = "durableWallTime";
const char UpdatePositionArgs::kMemberIdFieldName[] = "memberId";
const char UpdatePositionArgs::kConfigVersionFieldName[] = "cfgver";
UpdatePositionArgs::UpdateInfo::UpdateInfo(const OpTime& applied,
+ const Date_t& appliedWall,
const OpTime& durable,
+ const Date_t& durableWall,
long long aCfgver,
long long aMemberId)
- : appliedOpTime(applied), durableOpTime(durable), cfgver(aCfgver), memberId(aMemberId) {}
-
-Status UpdatePositionArgs::initialize(const BSONObj& argsObj) {
+ : appliedOpTime(applied),
+ appliedWallTime(appliedWall),
+ durableOpTime(durable),
+ durableWallTime(durableWall),
+ cfgver(aCfgver),
+ memberId(aMemberId) {}
+
+Status UpdatePositionArgs::initialize(const BSONObj& argsObj, bool requireWallTime) {
// grab the array of changes
BSONElement updateArray;
Status status = bsonExtractTypedField(argsObj, kUpdateArrayFieldName, Array, &updateArray);
@@ -70,6 +79,24 @@ Status UpdatePositionArgs::initialize(const BSONObj& argsObj) {
if (!status.isOK())
return status;
+ Date_t appliedWallTime = Date_t::min();
+ BSONElement appliedWallTimeElement;
+ status = bsonExtractTypedField(
+ entry, kAppliedWallTimeFieldName, BSONType::Date, &appliedWallTimeElement);
+ if (!status.isOK() && (status != ErrorCodes::NoSuchKey || requireWallTime))
+ return status;
+ if (status.isOK())
+ appliedWallTime = appliedWallTimeElement.Date();
+
+ Date_t durableWallTime = Date_t::min();
+ BSONElement durableWallTimeElement;
+ status = bsonExtractTypedField(
+ entry, kDurableWallTimeFieldName, BSONType::Date, &durableWallTimeElement);
+ if (!status.isOK() && (status != ErrorCodes::NoSuchKey || requireWallTime))
+ return status;
+ if (status.isOK())
+ durableWallTime = durableWallTimeElement.Date();
+
OpTime durableOpTime;
status = bsonExtractOpTimeField(entry, kDurableOpTimeFieldName, &durableOpTime);
if (!status.isOK())
@@ -87,7 +114,8 @@ Status UpdatePositionArgs::initialize(const BSONObj& argsObj) {
if (!status.isOK())
return status;
- _updates.push_back(UpdateInfo(appliedOpTime, durableOpTime, cfgver, memberID));
+ _updates.push_back(UpdateInfo(
+ appliedOpTime, appliedWallTime, durableOpTime, durableWallTime, cfgver, memberID));
}
return Status::OK();
diff --git a/src/mongo/db/repl/update_position_args.h b/src/mongo/db/repl/update_position_args.h
index f260c53d02f..37c9a536eed 100644
--- a/src/mongo/db/repl/update_position_args.h
+++ b/src/mongo/db/repl/update_position_args.h
@@ -48,18 +48,24 @@ public:
static const char kCommandFieldName[];
static const char kUpdateArrayFieldName[];
static const char kAppliedOpTimeFieldName[];
+ static const char kAppliedWallTimeFieldName[];
static const char kDurableOpTimeFieldName[];
+ static const char kDurableWallTimeFieldName[];
static const char kMemberIdFieldName[];
static const char kConfigVersionFieldName[];
struct UpdateInfo {
UpdateInfo(const OpTime& applied,
+ const Date_t& appliedWall,
const OpTime& durable,
+ const Date_t& durableWall,
long long aCfgver,
long long aMemberId);
OpTime appliedOpTime;
+ Date_t appliedWallTime;
OpTime durableOpTime;
+ Date_t durableWallTime;
long long cfgver;
long long memberId;
};
@@ -69,7 +75,7 @@ public:
/**
* Initializes this UpdatePositionArgs from the contents of "argsObj".
*/
- Status initialize(const BSONObj& argsObj);
+ Status initialize(const BSONObj& argsObj, bool requireWallTime);
/**
* Gets a begin iterator over the UpdateInfos stored in this UpdatePositionArgs.