summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-10-09 19:30:29 -0400
committerBenety Goh <benety@mongodb.com>2015-10-10 08:54:15 -0400
commit69dec2fe8fed6d32ec4998ea7ec7ab063cb5b788 (patch)
tree560602199c9d66435a1eb705a0278bf786764bf0 /src
parentd82d83f7100863fe93103200ba59ebfa966e91ee (diff)
downloadmongo-69dec2fe8fed6d32ec4998ea7ec7ab063cb5b788.tar.gz
SERVER-20832 restart heartbeats at most once per step down command
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp83
3 files changed, 92 insertions, 0 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 76ebd28cddc..c029db24838 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1269,6 +1269,7 @@ ReplicationCoordinatorImpl::stepDown_nonBlocking(OperationContext* txn,
waitUntil,
stepDownUntil,
force,
+ true, // restartHeartbeats
result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
*result = cbh.getStatus();
@@ -1309,6 +1310,7 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
const Date_t waitUntil,
const Date_t stepDownUntil,
bool force,
+ bool restartHeartbeats,
Status* result) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
// Cancelation only occurs on shutdown, which will also handle signaling the event.
@@ -1385,6 +1387,7 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
waitUntil,
stepDownUntil,
force,
+ false, // restartHeartbeats
result));
if (!cbh.isOK()) {
*result = cbh.getStatus();
@@ -1394,6 +1397,11 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
// We send out a fresh round of heartbeats because stepping down successfully without
// {force: true} is dependent on timely heartbeat data.
+ // This callback is invoked every time a heartbeat response is processed so restart heartbeats
+ // only once.
+ if (!restartHeartbeats) {
+ return;
+ }
stdx::lock_guard<stdx::mutex> lk(_mutex);
_cancelHeartbeats();
_startHeartbeats_inlock(cbData);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index b142ed2d1cf..41ee5045be7 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -670,6 +670,7 @@ private:
Date_t waitUntil,
Date_t stepdownUntil,
bool force,
+ bool restartHeartbeats,
Status* result);
OID _getMyRID_inlock() const;
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index db7193bc958..674f50bfc9d 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -1465,6 +1465,89 @@ TEST_F(StepDownTest, StepDownCatchUp) {
getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj()));
}
while (getNet()->hasReadyRequests()) {
+ auto noi = getNet()->getNextReadyRequest();
+ log() << "Blackholing network request " << noi->getRequest().cmdObj;
+ getNet()->blackHole(noi);
+ }
+ getNet()->runReadyNetworkOperations();
+ exitNetwork();
+
+ getReplExec()->waitForEvent(eventHandle);
+ ASSERT_OK(result);
+ ASSERT_TRUE(repl->getMemberState().secondary());
+}
+
+TEST_F(StepDownTest, StepDownCatchUpOnSecondHeartbeat) {
+ OperationContextReplMock txn;
+ OpTimeWithTermZero optime1(100, 1);
+ OpTimeWithTermZero optime2(100, 2);
+ // No secondary is caught up
+ auto repl = getReplCoord();
+ repl->setMyLastOptime(optime2);
+ ASSERT_OK(repl->setLastOptime_forTest(1, 1, optime1));
+ ASSERT_OK(repl->setLastOptime_forTest(1, 2, optime1));
+
+ simulateSuccessfulV1Election();
+
+ // Step down where the secondary actually has to catch up before the stepDown can succeed.
+ // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
+ // T + 2 seconds and send out a new round of heartbeats immediately.
+ // This makes it unnecessary to advance the clock after entering the network to process
+ // the heartbeat requests.
+ Status result(ErrorCodes::InternalError, "not mutated");
+ auto globalReadLockAndEventHandle =
+ repl->stepDown_nonBlocking(&txn, false, Milliseconds(10000), Milliseconds(60000), &result);
+ const auto& eventHandle = globalReadLockAndEventHandle.second;
+ ASSERT_TRUE(eventHandle);
+ ASSERT_TRUE(txn.lockState()->isReadLocked());
+
+ // Secondary has not caught up on first round of heartbeats.
+ enterNetwork();
+ ASSERT(getNet()->hasReadyRequests());
+ NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
+ RemoteCommandRequest request = noi->getRequest();
+ log() << "HB1: " << request.target.toString() << " processing " << request.cmdObj;
+ ReplSetHeartbeatArgsV1 hbArgs;
+ if (hbArgs.initialize(request.cmdObj).isOK()) {
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName(hbArgs.getSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(hbArgs.getConfigVersion());
+ BSONObjBuilder respObj;
+ respObj << "ok" << 1;
+ hbResp.addToBSON(&respObj, false);
+ getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj()));
+ }
+ while (getNet()->hasReadyRequests()) {
+ getNet()->blackHole(getNet()->getNextReadyRequest());
+ }
+ getNet()->runReadyNetworkOperations();
+ exitNetwork();
+
+ auto config = getReplCoord()->getConfig();
+ auto heartbeatInterval = config.getHeartbeatInterval();
+
+ // Make a secondary actually catch up
+ enterNetwork();
+ auto until = getNet()->now() + heartbeatInterval;
+ getNet()->runUntil(until);
+ ASSERT_EQUALS(until, getNet()->now());
+ ASSERT(getNet()->hasReadyRequests());
+ noi = getNet()->getNextReadyRequest();
+ request = noi->getRequest();
+ log() << "HB2: " << request.target.toString() << " processing " << request.cmdObj;
+ if (hbArgs.initialize(request.cmdObj).isOK()) {
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName(hbArgs.getSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(hbArgs.getConfigVersion());
+ hbResp.setOpTime(optime2);
+ BSONObjBuilder respObj;
+ respObj << "ok" << 1;
+ hbResp.addToBSON(&respObj, false);
+ getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj()));
+ }
+ while (getNet()->hasReadyRequests()) {
getNet()->blackHole(getNet()->getNextReadyRequest());
}
getNet()->runReadyNetworkOperations();