summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp326
1 files changed, 100 insertions, 226 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 7588fb166d5..1075a7e9232 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -160,7 +160,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
- simulateCatchUpAbort();
+ simulateCatchUpTimeout();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
@@ -223,6 +223,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
getReplCoord()->waitForElectionFinish_forTest();
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
+ // Wait for catchup check to finish.
+ simulateCatchUpTimeout();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
@@ -1278,19 +1280,15 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase)
class PrimaryCatchUpTest : public ReplCoordTest {
protected:
using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator;
- using NetworkRequestFn = stdx::function<void(const NetworkOpIter)>;
+ using FreshnessScanFn = stdx::function<void(const NetworkOpIter)>;
- const Timestamp smallTimestamp{1, 1};
-
- executor::RemoteCommandResponse makeHeartbeatResponse(OpTime opTime) {
+ void replyToHeartbeatRequestAsSecondaries(const NetworkOpIter noi) {
ReplSetConfig rsConfig = getReplCoord()->getReplicaSetConfig_forTest();
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
- return makeResponseStatus(hbResp.toBSON(true));
+ getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(hbResp.toBSON(true)));
}
void simulateSuccessfulV1Voting() {
@@ -1302,9 +1300,10 @@ protected:
log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString();
- // Process requests until we're primary but leave the heartbeats for the notification
+ bool hasReadyRequests = true;
+ // Process requests until we're primary and consume the heartbeats for the notification
// of election win. Exit immediately on unexpected requests.
- while (!replCoord->getMemberState().primary()) {
+ while (!replCoord->getMemberState().primary() || hasReadyRequests) {
log() << "Waiting on network in state " << replCoord->getMemberState();
net->enterNetwork();
if (net->now() < electionTimeoutWhen) {
@@ -1315,9 +1314,7 @@ protected:
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
- OpTime opTime(Timestamp(), getReplCoord()->getTerm());
- net->scheduleResponse(
- net->getNextReadyRequest(), net->now(), makeHeartbeatResponse(opTime));
+ replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest());
} else if (request.cmdObj.firstElement().fieldNameStringData() ==
"replSetRequestVotes") {
net->scheduleResponse(net->getNextReadyRequest(),
@@ -1339,11 +1336,12 @@ protected:
// executor.
getReplExec()->waitForDBWork_forTest();
net->runReadyNetworkOperations();
+ hasReadyRequests = net->hasReadyRequests();
net->exitNetwork();
}
}
- ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime, bool infiniteTimeout = false) {
+ ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
@@ -1358,8 +1356,7 @@ protected:
<< "protocolVersion"
<< 1
<< "settings"
- << BSON("heartbeatTimeoutSecs" << 1 << "catchUpTimeoutMillis"
- << (infiniteTimeout ? -1 : 5000)));
+ << BSON("catchUpTimeoutMillis" << 5000));
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
@@ -1381,15 +1378,17 @@ protected:
return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime.toBSON())));
}
- void processHeartbeatRequests(NetworkRequestFn onHeartbeatRequest) {
+ void processFreshnessScanRequests(FreshnessScanFn onFreshnessScanRequest) {
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (net->hasReadyRequests()) {
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
- if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
- onHeartbeatRequest(noi);
+ if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") {
+ onFreshnessScanRequest(noi);
+ } else if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
+ replyToHeartbeatRequestAsSecondaries(noi);
} else {
log() << "Black holing unexpected request to " << request.target << ": "
<< request.cmdObj;
@@ -1400,8 +1399,7 @@ protected:
net->exitNetwork();
}
- // Response heartbeats with opTime until the given time. Exit if it sees any other request.
- void replyHeartbeatsAndRunUntil(Date_t until, NetworkRequestFn onHeartbeatRequest) {
+ void replyHeartbeatsAndRunUntil(Date_t until) {
auto net = getNet();
net->enterNetwork();
while (net->now() < until) {
@@ -1409,10 +1407,9 @@ protected:
// Peek the next request
auto noi = net->getFrontOfUnscheduledQueue();
auto& request = noi->getRequest();
- log() << request.target << " at " << net->now() << " processing " << request.cmdObj;
if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
// Consume the next request
- onHeartbeatRequest(net->getNextReadyRequest());
+ replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest());
} else {
// Cannot consume other requests than heartbeats.
net->exitNetwork();
@@ -1423,153 +1420,126 @@ protected:
}
net->exitNetwork();
}
-
- // Simulate the work done by bgsync and applier threads. setMyLastAppliedOpTime() will signal
- // the optime waiter.
- void advanceMyLastAppliedOpTime(OpTime opTime) {
- getReplCoord()->setMyLastAppliedOpTime(opTime);
- getNet()->enterNetwork();
- getNet()->runReadyNetworkOperations();
- getNet()->exitNetwork();
- }
};
-// The first round of heartbeats indicates we are the most up-to-date.
-TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) {
+TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- int count = 0;
- processHeartbeatRequests([this, time1, &count](const NetworkOpIter noi) {
- count++;
- auto net = getNet();
- // The old primary accepted one more op and all nodes caught up after voting for me.
- net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time1));
+ processFreshnessScanRequests([this](const NetworkOpIter noi) {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime()));
});
-
- // Get 2 heartbeats from secondaries.
- ASSERT_EQUALS(2, count);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats"));
+ ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-// Heartbeats set a future target OpTime and we reached that successfully.
-TEST_F(PrimaryCatchUpTest, CatchupSucceeds) {
+TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
- OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
- auto net = getNet();
- // The old primary accepted one more op and all nodes caught up after voting for me.
- net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
+
+ processFreshnessScanRequests([this](const NetworkOpIter noi) {
+ auto request = noi->getRequest();
+ log() << "Black holing request to " << request.target << ": " << request.cmdObj;
+ getNet()->blackHole(noi);
});
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- advanceMyLastAppliedOpTime(time2);
+
+ auto net = getNet();
+ replyHeartbeatsAndRunUntil(net->now() + config.getCatchUpTimeoutPeriod());
+ ASSERT_EQ((int)getReplCoord()->getApplierState(), (int)ApplierState::Draining);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, CatchupTimeout) {
+TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
- replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time2](const NetworkOpIter noi) {
- // Other nodes are ahead of me.
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
- });
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
- stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out"));
- auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
- Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
- ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
-}
-TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) {
- startCapturingLogMessages();
-
- OpTime time1(Timestamp(100, 1), 0);
- ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- // We should get caught up by the timeout time.
- auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
- replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) {
- const RemoteCommandRequest& request = noi->getRequest();
- if (request.target.host() == "node2") {
- auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host");
- getNet()->scheduleResponse(noi, getNet()->now(), status);
- } else {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
- }
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ // The old primary accepted one more op and all nodes caught up after voting for me.
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
+
+ NetworkInterfaceMock* net = getNet();
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ // Simulate the work done by bgsync and applier threads.
+ // setMyLastAppliedOpTime() will signal the optime waiter.
+ getReplCoord()->setMyLastAppliedOpTime(time2);
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary."));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) {
+TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
+ OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- // We should get caught up by the timeout time.
- auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
- replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) {
- const RemoteCommandRequest& request = noi->getRequest();
- if (request.target.host() == "node2") {
- log() << "Black holing heartbeat from " << request.target.host();
- getNet()->blackHole(noi);
- } else {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
- }
+
+ // The new primary learns of the latest OpTime.
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
+
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) {
+TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- // Step down immediately.
+
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto request = noi->getRequest();
+ log() << "Black holing request to " << request.target << ": " << request.cmdObj;
+ getNet()->blackHole(noi);
+ });
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+ replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
- ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime"));
- ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary"));
auto opCtx = makeOperationContext();
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
@@ -1581,25 +1551,30 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- // Step down in the middle of catchup.
- auto abortTime = getNet()->now() + config.getCatchUpTimeoutPeriod() / 2;
- replyHeartbeatsAndRunUntil(abortTime, [this, time2](const NetworkOpIter noi) {
- // Other nodes are ahead of me.
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
+
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ // The old primary accepted one more op and all nodes caught up after voting for me.
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- // replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
+ auto net = getNet();
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ auto opCtx = makeOperationContext();
+ // Simulate the applier signaling replCoord to exit drain mode.
+ // At this point, we see the stepdown and reset the states.
+ getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
- ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime"));
- ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
- auto opCtx = makeOperationContext();
+ ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -1611,17 +1586,24 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
auto net = getNet();
// The old primary accepted one more op and all nodes caught up after voting for me.
- net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
+
+ NetworkInterfaceMock* net = getNet();
ReplicationCoordinatorImpl* replCoord = getReplCoord();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- advanceMyLastAppliedOpTime(time2);
+ // Simulate the work done by bgsync and applier threads.
+ // setMyLastAppliedOpTime() will signal the optime waiter.
+ replCoord->setMyLastAppliedOpTime(time2);
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
ASSERT(replCoord->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary."));
// Step down during drain mode.
TopologyCoordinator::UpdateTermResult updateTermResult;
@@ -1635,10 +1617,9 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
simulateSuccessfulV1Voting();
ASSERT_TRUE(replCoord->getMemberState().primary());
- // No need to catch up, so we enter drain mode.
- processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
- auto net = getNet();
- net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
+ // No need to catch-up, so we enter drain mode.
+ processFreshnessScanRequests([this](const NetworkOpIter noi) {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime()));
});
ASSERT(replCoord->getApplierState() == ApplierState::Draining);
auto opCtx = makeOperationContext();
@@ -1652,113 +1633,6 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
- OpTime time1(Timestamp(100, 1), 0);
- OpTime time2(Timestamp(200, 1), 0);
- OpTime time3(Timestamp(300, 1), 0);
- OpTime time4(Timestamp(400, 1), 0);
-
- // 1) The primary is at time 1 at the beginning.
- ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
-
- // 2) It cannot see all nodes. It learns of time 3 from one node, but the other isn't available.
- // So the target optime is time 3.
- startCapturingLogMessages();
- auto oneThirdOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() / 3;
- replyHeartbeatsAndRunUntil(oneThirdOfTimeout, [this, time3](const NetworkOpIter noi) {
- const RemoteCommandRequest& request = noi->getRequest();
- if (request.target.host() == "node2") {
- auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host");
- getNet()->scheduleResponse(noi, getNet()->now(), status);
- } else {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3));
- }
- });
- // The node is still in catchup mode, but the target optime has been set.
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
-
- // 3) Advancing its applied optime to time 2 isn't enough.
- advanceMyLastAppliedOpTime(time2);
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
-
- // 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target.
- startCapturingLogMessages();
- auto twoThirdsOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() * 2 / 3;
- replyHeartbeatsAndRunUntil(twoThirdsOfTimeout, [this, time3, time4](const NetworkOpIter noi) {
- const RemoteCommandRequest& request = noi->getRequest();
- if (request.target.host() == "node2") {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time4));
- } else {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3));
- }
- });
- // The node is still in catchup mode, but the target optime has been updated.
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
-
- // 5) Advancing to time 3 isn't enough now.
- advanceMyLastAppliedOpTime(time3);
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
-
- // 6) The node catches up time 4 eventually.
- startCapturingLogMessages();
- advanceMyLastAppliedOpTime(time4);
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
- stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime"));
- auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
- Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
- ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
-}
-
-TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) {
- startCapturingLogMessages();
-
- OpTime time1(Timestamp(100, 1), 0);
- OpTime time2(Timestamp(100, 2), 0);
- ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, true);
-
- // Run time far forward and ensure we are still in catchup mode.
- // This is an arbitrary time 'far' into the future.
- auto later = getNet()->now() + config.getElectionTimeoutPeriod() * 10;
- replyHeartbeatsAndRunUntil(later, [this, &config, time2](const NetworkOpIter noi) {
- // Other nodes are ahead of me.
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
-
- // Simulate the heartbeats from secondaries to primary to update liveness info.
- // TODO(sz): Remove this after merging liveness info and heartbeats.
- const RemoteCommandRequest& request = noi->getRequest();
- ReplSetHeartbeatArgsV1 hbArgs;
- hbArgs.setConfigVersion(config.getConfigVersion());
- hbArgs.setSetName(config.getReplSetName());
- hbArgs.setSenderHost(request.target);
- hbArgs.setSenderId(config.findMemberByHostAndPort(request.target)->getId());
- hbArgs.setTerm(getReplCoord()->getTerm());
- ASSERT(hbArgs.isInitialized());
- ReplSetHeartbeatResponse response;
- ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
- });
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
-
- // Simulate a user initiated abort.
- ASSERT_OK(getReplCoord()->abortCatchupIfNeeded());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
-
- stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
- ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime"));
- ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
- auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
- Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
- ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
-}
-
} // namespace
} // namespace repl
} // namespace mongo