diff options
author | Benety Goh <benety@mongodb.com> | 2015-10-09 19:30:29 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-10-10 08:54:15 -0400 |
commit | 69dec2fe8fed6d32ec4998ea7ec7ab063cb5b788 (patch) | |
tree | 560602199c9d66435a1eb705a0278bf786764bf0 /src | |
parent | d82d83f7100863fe93103200ba59ebfa966e91ee (diff) | |
download | mongo-69dec2fe8fed6d32ec4998ea7ec7ab063cb5b788.tar.gz |
SERVER-20832 restart heartbeats at most once per step down command
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 83 |
3 files changed, 92 insertions, 0 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 76ebd28cddc..c029db24838 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1269,6 +1269,7 @@ ReplicationCoordinatorImpl::stepDown_nonBlocking(OperationContext* txn, waitUntil, stepDownUntil, force, + true, // restartHeartbeats result)); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { *result = cbh.getStatus(); @@ -1309,6 +1310,7 @@ void ReplicationCoordinatorImpl::_stepDownContinue( const Date_t waitUntil, const Date_t stepDownUntil, bool force, + bool restartHeartbeats, Status* result) { if (cbData.status == ErrorCodes::CallbackCanceled) { // Cancelation only occurs on shutdown, which will also handle signaling the event. @@ -1385,6 +1387,7 @@ void ReplicationCoordinatorImpl::_stepDownContinue( waitUntil, stepDownUntil, force, + false, // restartHeartbeats result)); if (!cbh.isOK()) { *result = cbh.getStatus(); @@ -1394,6 +1397,11 @@ void ReplicationCoordinatorImpl::_stepDownContinue( // We send out a fresh round of heartbeats because stepping down successfully without // {force: true} is dependent on timely heartbeat data. + // This callback is invoked every time a heartbeat response is processed so restart heartbeats + // only once. + if (!restartHeartbeats) { + return; + } stdx::lock_guard<stdx::mutex> lk(_mutex); _cancelHeartbeats(); _startHeartbeats_inlock(cbData); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index b142ed2d1cf..41ee5045be7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -670,6 +670,7 @@ private: Date_t waitUntil, Date_t stepdownUntil, bool force, + bool restartHeartbeats, Status* result); OID _getMyRID_inlock() const; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index db7193bc958..674f50bfc9d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -1465,6 +1465,89 @@ TEST_F(StepDownTest, StepDownCatchUp) { getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj())); } while (getNet()->hasReadyRequests()) { + auto noi = getNet()->getNextReadyRequest(); + log() << "Blackholing network request " << noi->getRequest().cmdObj; + getNet()->blackHole(noi); + } + getNet()->runReadyNetworkOperations(); + exitNetwork(); + + getReplExec()->waitForEvent(eventHandle); + ASSERT_OK(result); + ASSERT_TRUE(repl->getMemberState().secondary()); +} + +TEST_F(StepDownTest, StepDownCatchUpOnSecondHeartbeat) { + OperationContextReplMock txn; + OpTimeWithTermZero optime1(100, 1); + OpTimeWithTermZero optime2(100, 2); + // No secondary is caught up + auto repl = getReplCoord(); + repl->setMyLastOptime(optime2); + ASSERT_OK(repl->setLastOptime_forTest(1, 1, optime1)); + ASSERT_OK(repl->setLastOptime_forTest(1, 2, optime1)); + + simulateSuccessfulV1Election(); + + // Step down where the secondary actually has to catch up before the stepDown can succeed. + // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for + // T + 2 seconds and send out a new round of heartbeats immediately. + // This makes it unnecessary to advance the clock after entering the network to process + // the heartbeat requests. + Status result(ErrorCodes::InternalError, "not mutated"); + auto globalReadLockAndEventHandle = + repl->stepDown_nonBlocking(&txn, false, Milliseconds(10000), Milliseconds(60000), &result); + const auto& eventHandle = globalReadLockAndEventHandle.second; + ASSERT_TRUE(eventHandle); + ASSERT_TRUE(txn.lockState()->isReadLocked()); + + // Secondary has not caught up on first round of heartbeats. + enterNetwork(); + ASSERT(getNet()->hasReadyRequests()); + NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); + RemoteCommandRequest request = noi->getRequest(); + log() << "HB1: " << request.target.toString() << " processing " << request.cmdObj; + ReplSetHeartbeatArgsV1 hbArgs; + if (hbArgs.initialize(request.cmdObj).isOK()) { + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName(hbArgs.getSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(hbArgs.getConfigVersion()); + BSONObjBuilder respObj; + respObj << "ok" << 1; + hbResp.addToBSON(&respObj, false); + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj())); + } + while (getNet()->hasReadyRequests()) { + getNet()->blackHole(getNet()->getNextReadyRequest()); + } + getNet()->runReadyNetworkOperations(); + exitNetwork(); + + auto config = getReplCoord()->getConfig(); + auto heartbeatInterval = config.getHeartbeatInterval(); + + // Make a secondary actually catch up + enterNetwork(); + auto until = getNet()->now() + heartbeatInterval; + getNet()->runUntil(until); + ASSERT_EQUALS(until, getNet()->now()); + ASSERT(getNet()->hasReadyRequests()); + noi = getNet()->getNextReadyRequest(); + request = noi->getRequest(); + log() << "HB2: " << request.target.toString() << " processing " << request.cmdObj; + if (hbArgs.initialize(request.cmdObj).isOK()) { + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName(hbArgs.getSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(hbArgs.getConfigVersion()); + hbResp.setOpTime(optime2); + BSONObjBuilder respObj; + respObj << "ok" << 1; + hbResp.addToBSON(&respObj, false); + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj())); + } + while (getNet()->hasReadyRequests()) { getNet()->blackHole(getNet()->getNextReadyRequest()); } getNet()->runReadyNetworkOperations(); |