summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMaria van Keulen <maria@mongodb.com>2019-04-15 11:52:11 -0400
committerMaria van Keulen <maria@mongodb.com>2019-04-19 14:32:04 -0400
commit943e0fbfd8bf7223f03189bd8af1d5e423aa387f (patch)
tree30f14257c01d7c175c19b45e63b3ff252989fc1a /src/mongo
parente1e95afbd58a7449dd1765cb910b4d136f95fcc4 (diff)
downloadmongo-943e0fbfd8bf7223f03189bd8af1d5e423aa387f.tar.gz
SERVER-40659 Add regression tests for wall clock time transmission
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp30
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h10
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp89
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h10
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp85
-rw-r--r--src/mongo/rpc/metadata/oplog_query_metadata_test.cpp1
6 files changed, 195 insertions, 30 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 9b0e6ac8b68..a9ec92b7b71 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1518,16 +1518,17 @@ OpTime ReplicationCoordinatorImpl::_getMyLastDurableOpTime_inlock() const {
Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer,
long long memberId,
- const OpTime& opTime) {
+ const OpTime& opTime,
+ Date_t wallTime) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(getReplicationMode() == modeReplSet);
- const UpdatePositionArgs::UpdateInfo update(OpTime(),
- Date_t::min(),
- opTime,
- Date_t::min() + Seconds(opTime.getSecs()),
- cfgVer,
- memberId);
+ if (wallTime == Date_t::min()) {
+ wallTime = Date_t() + Seconds(opTime.getSecs());
+ }
+
+ const UpdatePositionArgs::UpdateInfo update(
+ OpTime(), Date_t::min(), opTime, wallTime, cfgVer, memberId);
long long configVersion;
const auto status = _setLastOptime(lock, update, &configVersion);
_updateLastCommittedOpTimeAndWallTime(lock);
@@ -1536,16 +1537,17 @@ Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer
Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer,
long long memberId,
- const OpTime& opTime) {
+ const OpTime& opTime,
+ Date_t wallTime) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(getReplicationMode() == modeReplSet);
- const UpdatePositionArgs::UpdateInfo update(opTime,
- Date_t::min() + Seconds(opTime.getSecs()),
- OpTime(),
- Date_t::min(),
- cfgVer,
- memberId);
+ if (wallTime == Date_t::min()) {
+ wallTime = Date_t() + Seconds(opTime.getSecs());
+ }
+
+ const UpdatePositionArgs::UpdateInfo update(
+ opTime, wallTime, OpTime(), Date_t::min(), cfgVer, memberId);
long long configVersion;
const auto status = _setLastOptime(lock, update, &configVersion);
_updateLastCommittedOpTimeAndWallTime(lock);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 76a6bf693be..7b64b5760be 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -370,8 +370,14 @@ public:
/**
* Simple wrappers around _setLastOptime to make it easier to test.
*/
- Status setLastAppliedOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
- Status setLastDurableOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
+ Status setLastAppliedOptime_forTest(long long cfgVer,
+ long long memberId,
+ const OpTime& opTime,
+ Date_t wallTime = Date_t::min());
+ Status setLastDurableOptime_forTest(long long cfgVer,
+ long long memberId,
+ const OpTime& opTime,
+ Date_t wallTime = Date_t::min());
/**
* Simple test wrappers that expose private methods.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 731e39e6edb..2e967fd781f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -1393,7 +1393,7 @@ protected:
auto config = getReplCoord()->getConfig();
auto heartbeatInterval = config.getHeartbeatInterval();
if (desiredWallTime == Date_t::min() && !desiredOpTime.isNull()) {
- desiredWallTime = Date_t::min() + Seconds(desiredOpTime.getSecs());
+ desiredWallTime = Date_t() + Seconds(desiredOpTime.getSecs());
}
enterNetwork();
@@ -1456,6 +1456,89 @@ TEST_F(ReplCoordTest, NodeReturnsBadValueWhenUpdateTermIsRunAgainstANonReplNode)
ASSERT_EQUALS(ErrorCodes::BadValue, getReplCoord()->updateTerm(opCtx.get(), 0).code());
}
+TEST_F(ReplCoordTest, UpdatePositionArgsAdvancesWallTimes) {
+ init("mySet/test1:1234,test2:1234,test3:1234");
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234")
+ << BSON("_id" << 1 << "host"
+ << "test2:1234")
+ << BSON("_id" << 2 << "host"
+ << "test3:1234"))),
+ HostAndPort("test1", 1234));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ const auto repl = getReplCoord();
+ OpTimeWithTermOne opTime1(100, 1);
+ OpTimeWithTermOne opTime2(200, 1);
+
+ repl->setMyLastAppliedOpTimeAndWallTime({opTime2, Date_t() + Seconds(1)});
+ repl->setMyLastAppliedOpTimeAndWallTime({opTime2, Date_t() + Seconds(2)});
+
+ // Secondaries not caught up yet.
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 1, opTime1, Date_t()));
+ ASSERT_OK(repl->setLastAppliedOptime_forTest(1, 2, opTime1, Date_t()));
+
+ simulateSuccessfulV1Election();
+ ASSERT_TRUE(repl->getMemberState().primary());
+
+ // Catch up the secondaries using only replSetUpdatePosition.
+ long long configVersion = repl->getConfig().getConfigVersion();
+ UpdatePositionArgs updatePositionArgs;
+
+ Date_t memberOneAppliedWallTime = Date_t() + Seconds(3);
+ Date_t memberOneDurableWallTime = Date_t() + Seconds(4);
+ Date_t memberTwoAppliedWallTime = Date_t() + Seconds(5);
+ Date_t memberTwoDurableWallTime = Date_t() + Seconds(6);
+
+ ASSERT_OK(updatePositionArgsInitialize(
+ updatePositionArgs,
+ BSON(UpdatePositionArgs::kCommandFieldName
+ << 1
+ << UpdatePositionArgs::kUpdateArrayFieldName
+ << BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << configVersion
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 1
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << memberOneAppliedWallTime
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << memberOneDurableWallTime)
+ << BSON(UpdatePositionArgs::kConfigVersionFieldName
+ << configVersion
+ << UpdatePositionArgs::kMemberIdFieldName
+ << 2
+ << UpdatePositionArgs::kAppliedOpTimeFieldName
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kAppliedWallTimeFieldName
+ << memberTwoAppliedWallTime
+ << UpdatePositionArgs::kDurableOpTimeFieldName
+ << opTime2.asOpTime().toBSON()
+ << UpdatePositionArgs::kDurableWallTimeFieldName
+ << memberTwoDurableWallTime)))));
+
+ ASSERT_OK(repl->processReplSetUpdatePosition(updatePositionArgs, &configVersion));
+
+ // Make sure wall times are propagated through processReplSetUpdatePosition
+ auto memberDataVector = repl->getMemberData();
+ for (auto member : memberDataVector) {
+ if (member.getMemberId() == 1) {
+ ASSERT_EQ(member.getLastAppliedWallTime(), memberOneAppliedWallTime);
+ ASSERT_EQ(member.getLastDurableWallTime(), memberOneDurableWallTime);
+ } else if (member.getMemberId() == 2) {
+ ASSERT_EQ(member.getLastAppliedWallTime(), memberTwoAppliedWallTime);
+ ASSERT_EQ(member.getLastDurableWallTime(), memberTwoDurableWallTime);
+ }
+ }
+}
+
TEST_F(ReplCoordTest, ElectionIdTracksTermInPV1) {
init("mySet/test1:1234,test2:1234,test3:1234");
@@ -1992,10 +2075,10 @@ protected:
Date_t wallTimeLagged = Date_t::min()) {
int hbNum = 1;
if (wallTimePrimary == Date_t::min()) {
- wallTimePrimary = wallTimePrimary + Seconds(optimePrimary.getSecs());
+ wallTimePrimary = Date_t() + Seconds(optimePrimary.getSecs());
}
if (wallTimeLagged == Date_t::min()) {
- wallTimeLagged = wallTimeLagged + Seconds(optimeLagged.getSecs());
+ wallTimeLagged = Date_t() + Seconds(optimeLagged.getSecs());
}
while (getNet()->hasReadyRequests()) {
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index ab831e8820a..c7dcbcb8985 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -119,7 +119,7 @@ protected:
void replCoordSetMyLastAppliedOpTime(const OpTime& opTime, Date_t wallTime = Date_t::min()) {
if (wallTime == Date_t::min()) {
- wallTime = wallTime + Seconds(opTime.getSecs());
+ wallTime = Date_t() + Seconds(opTime.getSecs());
}
getReplCoord()->setMyLastAppliedOpTimeAndWallTime({opTime, wallTime});
}
@@ -128,14 +128,14 @@ protected:
ReplicationCoordinator::DataConsistency consistency,
Date_t wallTime = Date_t::min()) {
if (wallTime == Date_t::min()) {
- wallTime = wallTime + Seconds(opTime.getSecs());
+ wallTime = Date_t() + Seconds(opTime.getSecs());
}
getReplCoord()->setMyLastAppliedOpTimeAndWallTimeForward({opTime, wallTime}, consistency);
}
void replCoordSetMyLastDurableOpTime(const OpTime& opTime, Date_t wallTime = Date_t::min()) {
if (wallTime == Date_t::min()) {
- wallTime = wallTime + Seconds(opTime.getSecs());
+ wallTime = Date_t() + Seconds(opTime.getSecs());
}
getReplCoord()->setMyLastDurableOpTimeAndWallTime({opTime, wallTime});
}
@@ -143,7 +143,7 @@ protected:
void replCoordSetMyLastDurableOpTimeForward(const OpTime& opTime,
Date_t wallTime = Date_t::min()) {
if (wallTime == Date_t::min()) {
- wallTime = wallTime + Seconds(opTime.getSecs());
+ wallTime = Date_t() + Seconds(opTime.getSecs());
}
getReplCoord()->setMyLastDurableOpTimeAndWallTimeForward({opTime, wallTime});
}
@@ -152,7 +152,7 @@ protected:
Date_t wallTime = Date_t::min(),
bool fromSyncSource = false) {
if (wallTime == Date_t::min()) {
- wallTime = wallTime + Seconds(opTime.getSecs());
+ wallTime = Date_t() + Seconds(opTime.getSecs());
}
getReplCoord()->advanceCommitPoint({opTime, wallTime}, fromSyncSource);
}
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index c225c93a585..891374e118c 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -119,7 +119,7 @@ protected:
void setMyOpTime(const OpTime& opTime, Date_t wallTime = Date_t::min()) {
if (wallTime == Date_t::min()) {
- wallTime = wallTime + Seconds(opTime.getSecs());
+ wallTime = Date_t() + Seconds(opTime.getSecs());
}
getTopoCoord().setMyLastAppliedOpTimeAndWallTime({opTime, wallTime}, now(), false);
}
@@ -129,7 +129,7 @@ protected:
bool isRollbackAllowed,
Date_t wallTime = Date_t::min()) {
if (wallTime == Date_t::min()) {
- wallTime = wallTime + Seconds(opTime.getSecs());
+ wallTime = Date_t() + Seconds(opTime.getSecs());
}
getTopoCoord().setMyLastAppliedOpTimeAndWallTime(
{opTime, wallTime}, now, isRollbackAllowed);
@@ -140,7 +140,7 @@ protected:
bool isRollbackAllowed,
Date_t wallTime = Date_t::min()) {
if (wallTime == Date_t::min()) {
- wallTime = wallTime + Seconds(opTime.getSecs());
+ wallTime = Date_t() + Seconds(opTime.getSecs());
}
getTopoCoord().setMyLastDurableOpTimeAndWallTime(
{opTime, wallTime}, now, isRollbackAllowed);
@@ -150,7 +150,7 @@ protected:
Date_t wallTime = Date_t::min(),
const bool fromSyncSource = false) {
if (wallTime == Date_t::min()) {
- wallTime = wallTime + Seconds(opTime.getSecs());
+ wallTime = Date_t() + Seconds(opTime.getSecs());
}
getTopoCoord().advanceLastCommittedOpTimeAndWallTime({opTime, wallTime}, fromSyncSource);
}
@@ -286,10 +286,10 @@ private:
Date_t lastDurableWallTime = Date_t::min(),
Date_t lastAppliedWallTime = Date_t::min()) {
if (lastDurableWallTime == Date_t::min()) {
- lastDurableWallTime = lastDurableWallTime + Seconds(lastOpTimeSender.getSecs());
+ lastDurableWallTime = Date_t() + Seconds(lastOpTimeSender.getSecs());
}
if (lastAppliedWallTime == Date_t::min()) {
- lastAppliedWallTime = lastAppliedWallTime + Seconds(lastOpTimeSender.getSecs());
+ lastAppliedWallTime = Date_t() + Seconds(lastOpTimeSender.getSecs());
}
ReplSetHeartbeatResponse hb;
hb.setConfigVersion(1);
@@ -5241,6 +5241,79 @@ TEST_F(TopoCoordTest, CheckIfCommitQuorumCanBeSatisfied) {
}
}
+TEST_F(TopoCoordTest, AdvanceCommittedOpTimeDisregardsWallTimeOrder) {
+ // This test starts by configuring a TopologyCoordinator as a member of a 3 node replica
+ // set. The first and second nodes are secondaries, and the third is primary and corresponds
+ // to ourself.
+ Date_t startupTime = Date_t::fromMillisSinceEpoch(100);
+ Date_t heartbeatTime = Date_t::fromMillisSinceEpoch(5000);
+ Timestamp electionTime(1, 2);
+ OpTimeAndWallTime initialCommittedOpTimeAndWallTime = {OpTime(Timestamp(4, 1), 20),
+ Date_t() + Seconds(5)};
+ // Chronologically, the OpTime of lastCommittedOpTimeAndWallTime is more recent than that of
+ // initialCommittedOpTimeAndWallTime, even though the former's wall time is less recent than
+ // that of the latter.
+ OpTimeAndWallTime lastCommittedOpTimeAndWallTime = {OpTime(Timestamp(5, 1), 20),
+ Date_t() + Seconds(3)};
+ std::string setName = "mySet";
+
+ ReplSetHeartbeatResponse hb;
+ hb.setConfigVersion(1);
+ hb.setState(MemberState::RS_SECONDARY);
+ hb.setElectionTime(electionTime);
+ hb.setDurableOpTimeAndWallTime(initialCommittedOpTimeAndWallTime);
+ hb.setAppliedOpTimeAndWallTime(initialCommittedOpTimeAndWallTime);
+ StatusWith<ReplSetHeartbeatResponse> hbResponseGood = StatusWith<ReplSetHeartbeatResponse>(hb);
+
+ updateConfig(BSON("_id" << setName << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test0:1234")
+ << BSON("_id" << 1 << "host"
+ << "test1:1234")
+ << BSON("_id" << 2 << "host"
+ << "test2:1234"))),
+ 2,
+ startupTime + Milliseconds(1));
+
+ // Advance the commit point to initialCommittedOpTimeAndWallTime.
+ HostAndPort memberOne = HostAndPort("test0:1234");
+ getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(1), setName, memberOne);
+ getTopoCoord().processHeartbeatResponse(
+ startupTime + Milliseconds(2), Milliseconds(1), memberOne, hbResponseGood);
+
+ HostAndPort memberTwo = HostAndPort("test1:1234");
+ getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(2), setName, memberTwo);
+ getTopoCoord().processHeartbeatResponse(
+ heartbeatTime, Milliseconds(1), memberTwo, hbResponseGood);
+
+ makeSelfPrimary(electionTime);
+ getTopoCoord().setMyLastAppliedOpTimeAndWallTime(
+ initialCommittedOpTimeAndWallTime, startupTime, false);
+ getTopoCoord().setMyLastDurableOpTimeAndWallTime(
+ initialCommittedOpTimeAndWallTime, startupTime, false);
+ getTopoCoord().advanceLastCommittedOpTimeAndWallTime(initialCommittedOpTimeAndWallTime, false);
+ ASSERT_EQ(getTopoCoord().getLastCommittedOpTimeAndWallTime(),
+ initialCommittedOpTimeAndWallTime);
+
+ // memberOne's lastApplied and lastDurable OpTimeAndWallTimes are equal to
+ // lastCommittedOpTimeAndWallTime, but memberTwo's are equal to
+ // initialCommittedOpTimeAndWallTime. Only the ordering of OpTimes should influence advancing
+ // the commit point.
+ hb.setAppliedOpTimeAndWallTime(lastCommittedOpTimeAndWallTime);
+ hb.setDurableOpTimeAndWallTime(lastCommittedOpTimeAndWallTime);
+ StatusWith<ReplSetHeartbeatResponse> hbResponseGoodUpdated =
+ StatusWith<ReplSetHeartbeatResponse>(hb);
+ getTopoCoord().prepareHeartbeatRequestV1(heartbeatTime + Milliseconds(3), setName, memberOne);
+ getTopoCoord().processHeartbeatResponse(
+ heartbeatTime + Milliseconds(4), Milliseconds(1), memberOne, hbResponseGoodUpdated);
+ getTopoCoord().setMyLastAppliedOpTimeAndWallTime(
+ lastCommittedOpTimeAndWallTime, startupTime, false);
+ getTopoCoord().setMyLastDurableOpTimeAndWallTime(
+ lastCommittedOpTimeAndWallTime, startupTime, false);
+ getTopoCoord().updateLastCommittedOpTimeAndWallTime();
+ ASSERT_EQ(getTopoCoord().getLastCommittedOpTimeAndWallTime(), lastCommittedOpTimeAndWallTime);
+}
+
TEST_F(HeartbeatResponseTestV1,
ScheduleACatchupTakeoverWhenElectableAndReceiveHeartbeatFromPrimaryInCatchup) {
updateConfig(BSON("_id"
diff --git a/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp b/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp
index 859e8bbf10c..6c48f245da2 100644
--- a/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp
+++ b/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp
@@ -46,6 +46,7 @@ TEST(ReplResponseMetadataTest, OplogQueryMetadataRoundtrip) {
OplogQueryMetadata metadata({opTime1, committedWall}, opTime2, 6, 12, -1);
ASSERT_EQ(opTime1, metadata.getLastOpCommitted().opTime);
+ ASSERT_EQ(committedWall, metadata.getLastOpCommitted().wallTime);
ASSERT_EQ(opTime2, metadata.getLastOpApplied());
BSONObjBuilder builder;