diff options
author | XueruiFa <xuerui.fa@mongodb.com> | 2020-10-13 15:36:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-19 14:35:15 +0000 |
commit | bddffe1ebc6a14da54d47ebd3a1ba80bb2efaff8 (patch) | |
tree | b1935cf9b4eb76c8073152393b20f69d739147a5 | |
parent | eeea2566fcc0bf90e2e9dfe97c3aa993dcf55d33 (diff) | |
download | mongo-bddffe1ebc6a14da54d47ebd3a1ba80bb2efaff8.tar.gz |
SERVER-50318: Only cancel scheduled heartbeats
(cherry picked from commit 23ae68b0fecde9f0484dc276f376697d91fcc344)
7 files changed, 262 insertions, 74 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index e65c59e28fd..0f8fd3dac3a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3959,8 +3959,13 @@ void ReplicationCoordinatorImpl::_postWonElectionUpdateMemberState(WithLock lk) invariant(_getMemberState_inlock().primary()); // Clear the sync source. _onFollowerModeStateChange(); - // Notify all secondaries of the election win. - _restartHeartbeats_inlock(); + + // Notify all secondaries of the election win by cancelling all current heartbeats and sending + // new heartbeat requests to all nodes. We must cancel and start instead of restarting scheduled + // heartbeats because all heartbeats must be restarted upon election succeeding. + _cancelHeartbeats_inlock(); + _startHeartbeats_inlock(); + invariant(!_catchupState); _catchupState = std::make_unique<CatchupState>(this); _catchupState->start_inlock(); @@ -4475,7 +4480,7 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp // of other members's state, allowing us to make informed sync source decisions. if (newSyncSource.empty() && !oldSyncSource.empty() && _selfIndex >= 0 && !_getMemberState_inlock().primary()) { - _restartHeartbeats_inlock(); + _restartScheduledHeartbeats_inlock(); } return newSyncSource; @@ -4983,7 +4988,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs "Scheduling heartbeat to fetch a new config since we are not " "a member of our current config", "senderHost"_attr = senderHost); - _scheduleHeartbeatToTarget_inlock(senderHost, -1, now); + _scheduleHeartbeatToTarget_inlock(senderHost, now); } } else if (result.isOK() && response->getConfigVersionAndTerm() < args.getConfigVersionAndTerm()) { @@ -5008,8 +5013,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs // will trigger reconfig, which cancels and reschedules all heartbeats. else if (args.hasSender()) { LOGV2(21401, "Scheduling heartbeat to fetch a newer config", attr); - int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost); - _scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now); + _scheduleHeartbeatToTarget_inlock(senderHost, now); } } else if (result.isOK() && args.getPrimaryId() >= 0 && (!response->hasPrimaryId() || response->getPrimaryId() != args.getPrimaryId())) { @@ -5028,7 +5032,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs "myPrimaryId"_attr = myPrimaryId, "senderAndPrimaryId"_attr = args.getPrimaryId(), "senderTerm"_attr = args.getTerm()); - _restartHeartbeats_inlock(); + _restartScheduledHeartbeats_inlock(); } } return result; diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 2fdc0645a02..574fc2d4913 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -671,7 +671,12 @@ private: std::multimap<OpTime, SharedWaiterHandle> _waiters; }; - typedef std::vector<executor::TaskExecutor::CallbackHandle> HeartbeatHandles; + enum class HeartbeatState { kScheduled = 0, kSent = 1 }; + struct HeartbeatHandle { + executor::TaskExecutor::CallbackHandle handle; + HeartbeatState hbState; + HostAndPort target; + }; // The state and logic of primary catchup. // @@ -905,22 +910,21 @@ private: bool isRollbackAllowed); /** - * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index - * into the replica set config members array that corresponds to the "target", or -1 if - * "target" is not in _rsConfig. + * Schedules a heartbeat to be sent to "target" at "when". */ - void _scheduleHeartbeatToTarget_inlock(const HostAndPort& target, int targetIndex, Date_t when); + void _scheduleHeartbeatToTarget_inlock(const HostAndPort& target, Date_t when); /** * Processes each heartbeat response. * * Schedules additional heartbeats, triggers elections and step downs, etc. */ - void _handleHeartbeatResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - int targetIndex); + void _handleHeartbeatResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData); void _trackHeartbeatHandle_inlock( - const StatusWith<executor::TaskExecutor::CallbackHandle>& handle); + const StatusWith<executor::TaskExecutor::CallbackHandle>& handle, + HeartbeatState hbState, + const HostAndPort& target); void _untrackHeartbeatHandle_inlock(const executor::TaskExecutor::CallbackHandle& handle); @@ -941,21 +945,17 @@ private: void _cancelHeartbeats_inlock(); /** - * Cancels all heartbeats, then starts a heartbeat for each member in the current config. - * Called while holding replCoord _mutex. + * Cancels all heartbeats that have been scheduled but not yet sent out, then reschedules them + * at the current time immediately. Called while holding replCoord _mutex. */ - void _restartHeartbeats_inlock(); + void _restartScheduledHeartbeats_inlock(); /** - * Asynchronously sends a heartbeat to "target". "targetIndex" is the index - * into the replica set config members array that corresponds to the "target", or -1 if - * we don't have a valid replica set config. + * Asynchronously sends a heartbeat to "target". * * Scheduled by _scheduleHeartbeatToTarget_inlock. */ - void _doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, - const HostAndPort& target, - int targetIndex); + void _doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, const HostAndPort& target); MemberState _getMemberState_inlock() const; @@ -1448,7 +1448,7 @@ private: mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicationCoordinatorImpl::_mutex"); // (S) // Handles to actively queued heartbeats. - HeartbeatHandles _heartbeatHandles; // (M) + std::vector<HeartbeatHandle> _heartbeatHandles; // (M) // When this node does not know itself to be a member of a config, it adds // every host that sends it a heartbeat request to this set, and also starts diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index b7974ed6a3d..dc2cd0b86b5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -2246,6 +2246,81 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase) ASSERT(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); } +TEST_F(ReplCoordTest, MemberHbDataIsRestartedUponWinningElection) { + auto replAllSeverityGuard = unittest::MinimumLoggedSeverityGuard{ + logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(3)}; + + auto replConfigBson = BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + + auto myHostAndPort = HostAndPort("node1", 12345); + assertStartSuccess(replConfigBson, myHostAndPort); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + // Respond to heartbeat requests. This should update memberData for all nodes. + simulateEnoughHeartbeatsForAllNodesUp(); + simulateSuccessfulDryRun(); + + // Verify that all other nodes were updated since restart. + auto memberData = getReplCoord()->getMemberData(); + for (auto& member : memberData) { + // We should not have updated our own memberData. + if (member.isSelf()) { + continue; + } + ASSERT_TRUE(member.isUpdatedSinceRestart()); + } + + enterNetwork(); + NetworkInterfaceMock* net = getNet(); + + // Advance clock time so that heartbeat requests are in SENT state. + auto config = getReplCoord()->getReplicaSetConfig_forTest(); + net->advanceTime(net->now() + config.getHeartbeatInterval()); + + while (net->hasReadyRequests()) { + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + LOGV2(5031801, + "{request_target} processing {request_cmdObj}", + "request_target"_attr = request.target.toString(), + "request_cmdObj"_attr = request.cmdObj); + if (request.cmdObj.firstElement().fieldNameStringData() != "replSetRequestVotes") { + // Since the heartbeat requests are in SENT state, we will black hole them here to avoid + // responding to them. + net->blackHole(noi); + } else { + net->scheduleResponse(noi, + net->now(), + makeResponseStatus(BSON("ok" << 1 << "term" << 1 << "voteGranted" + << true << "reason" + << ""))); + } + net->runReadyNetworkOperations(); + } + exitNetwork(); + + getReplCoord()->waitForElectionFinish_forTest(); + ASSERT_TRUE(getReplCoord()->getMemberState().primary()); + + // Verify that the memberData for every node has not been updated since becoming primary. This + // can only happen if all heartbeat requests are restarted after the election, not just + // heartbeats in SCHEDULED state. + memberData = getReplCoord()->getMemberData(); + for (auto& member : memberData) { + ASSERT_FALSE(member.isUpdatedSinceRestart()); + } +} + class PrimaryCatchUpTest : public ReplCoordTest { protected: using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator; diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index ed0b0de939d..95447003152 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -92,8 +92,7 @@ Milliseconds ReplicationCoordinatorImpl::_getRandomizedElectionOffset_inlock() { } void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, - const HostAndPort& target, - int targetIndex) { + const HostAndPort& target) { stdx::lock_guard<Latch> lk(_mutex); _untrackHeartbeatHandle_inlock(cbData.myHandle); @@ -113,7 +112,7 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::Call target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout); const executor::TaskExecutor::RemoteCommandCallbackFn callback = [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - return _handleHeartbeatResponse(cbData, targetIndex); + return _handleHeartbeatResponse(cbData); }; LOGV2_FOR_HEARTBEATS(4615670, @@ -123,11 +122,11 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::Call "requestId"_attr = request.id, "target"_attr = target, "heartbeatObj"_attr = heartbeatObj); - _trackHeartbeatHandle_inlock(_replExecutor->scheduleRemoteCommand(request, callback)); + _trackHeartbeatHandle_inlock( + _replExecutor->scheduleRemoteCommand(request, callback), HeartbeatState::kSent, target); } void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAndPort& target, - int targetIndex, Date_t when) { LOGV2_FOR_HEARTBEATS(4615618, 2, @@ -135,10 +134,13 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAnd "Scheduling heartbeat", "target"_attr = target, "when"_attr = when); - _trackHeartbeatHandle_inlock(_replExecutor->scheduleWorkAt( - when, [=](const executor::TaskExecutor::CallbackArgs& cbData) { - _doMemberHeartbeat(cbData, target, targetIndex); - })); + _trackHeartbeatHandle_inlock( + _replExecutor->scheduleWorkAt(when, + [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _doMemberHeartbeat(cbData, target); + }), + HeartbeatState::kScheduled, + target); } void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj response, @@ -159,14 +161,14 @@ void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj respons _replExecutor->now(), _rsConfig.getReplSetName(), request.target); // Pretend we sent a request so that _untrackHeartbeatHandle_inlock succeeds. - _trackHeartbeatHandle_inlock(handle); + _trackHeartbeatHandle_inlock(handle, HeartbeatState::kSent, request.target); } - _handleHeartbeatResponse(cbData, targetIndex); + _handleHeartbeatResponse(cbData); } void ReplicationCoordinatorImpl::_handleHeartbeatResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) { + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { stdx::unique_lock<Latch> lk(_mutex); // remove handle from queued heartbeats @@ -300,8 +302,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( } } - _scheduleHeartbeatToTarget_inlock( - target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); + _scheduleHeartbeatToTarget_inlock(target, std::max(now, action.getNextHeartbeatStartDate())); _handleHeartbeatResponseAction_inlock(action, hbStatusResponse, std::move(lk)); } @@ -832,18 +833,26 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( } void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock( - const StatusWith<executor::TaskExecutor::CallbackHandle>& handle) { + const StatusWith<executor::TaskExecutor::CallbackHandle>& handle, + HeartbeatState hbState, + const HostAndPort& target) { if (handle.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(18912, handle.getStatus()); - _heartbeatHandles.push_back(handle.getValue()); + + // The target's HostAndPort should be safe to store, because it cannot change without a + // reconfig. On reconfig, all current heartbeats get cancelled and new requests are sent out, so + // there should not be a situation where the target node's HostAndPort changes but this + // heartbeat handle remains active. + _heartbeatHandles.push_back({handle.getValue(), hbState, target}); } void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock( const executor::TaskExecutor::CallbackHandle& handle) { - const HeartbeatHandles::iterator newEnd = - std::remove(_heartbeatHandles.begin(), _heartbeatHandles.end(), handle); + const auto newEnd = std::remove_if(_heartbeatHandles.begin(), + _heartbeatHandles.end(), + [&](auto& hbHandle) { return hbHandle.handle == handle; }); invariant(newEnd != _heartbeatHandles.end()); _heartbeatHandles.erase(newEnd, _heartbeatHandles.end()); } @@ -851,8 +860,8 @@ void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock( void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { LOGV2_FOR_HEARTBEATS(4615630, 2, "Cancelling all heartbeats"); - for (const auto& handle : _heartbeatHandles) { - _replExecutor->cancel(handle); + for (const auto& hbHandle : _heartbeatHandles) { + _replExecutor->cancel(hbHandle.handle); } // Heartbeat callbacks will remove themselves from _heartbeatHandles when they execute with // CallbackCanceled status, so it's better to leave the handles in the list, for now. @@ -862,24 +871,45 @@ void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { } } -void ReplicationCoordinatorImpl::_restartHeartbeats_inlock() { - _cancelHeartbeats_inlock(); - _startHeartbeats_inlock(); +void ReplicationCoordinatorImpl::_restartScheduledHeartbeats_inlock() { + LOGV2_FOR_HEARTBEATS(5031800, 2, "Restarting all scheduled heartbeats"); + + const Date_t now = _replExecutor->now(); + stdx::unordered_set<HostAndPort> restartedTargets; + + for (auto& hbHandle : _heartbeatHandles) { + // Only cancel heartbeats that are scheduled. If a heartbeat request has already been + // sent, we should wait for the response instead. + if (hbHandle.hbState != HeartbeatState::kScheduled) { + continue; + } + + LOGV2_FOR_HEARTBEATS(5031802, 2, "Restarting heartbeat", "target"_attr = hbHandle.target); + _replExecutor->cancel(hbHandle.handle); + + // Track the members that we have cancelled heartbeats. + restartedTargets.insert(hbHandle.target); + } + + for (auto target : restartedTargets) { + _scheduleHeartbeatToTarget_inlock(target, now); + _topCoord->restartHeartbeat(now, target); + } } void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { const Date_t now = _replExecutor->now(); _seedList.clear(); + for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { if (i == _selfIndex) { continue; } - _scheduleHeartbeatToTarget_inlock(_rsConfig.getMemberAt(i).getHostAndPort(), i, now); + auto target = _rsConfig.getMemberAt(i).getHostAndPort(); + _scheduleHeartbeatToTarget_inlock(target, now); + _topCoord->restartHeartbeat(now, target); } - _topCoord->restartHeartbeats(); - - _topCoord->resetAllMemberTimeouts(_replExecutor->now()); _scheduleNextLivenessUpdate_inlock(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 851d457b333..a04a55d7f2f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -205,8 +205,7 @@ TEST_F(ReplCoordHBV1Test, ASSERT_TRUE(getExternalState()->threadsStarted()); } -TEST_F(ReplCoordHBV1Test, - SecondaryReceivesHeartbeatRequestFromPrimaryWithDifferentPrimaryIdRestartsHeartbeats) { +TEST_F(ReplCoordHBV1Test, RestartingHeartbeatsShouldOnlyCancelScheduledHeartbeats) { auto replAllSeverityGuard = unittest::MinimumLoggedSeverityGuard{ logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(3)}; @@ -226,18 +225,104 @@ TEST_F(ReplCoordHBV1Test, getReplCoord()->updateTerm_forTest(1, nullptr); ASSERT_EQ(getReplCoord()->getTerm(), 1); + auto rsConfig = getReplCoord()->getConfig(); + enterNetwork(); - // Ignore the first 2 messages. for (int j = 0; j < 2; ++j) { const auto noi = getNet()->getNextReadyRequest(); - noi->getRequest(); - getNet()->blackHole(noi); + const RemoteCommandRequest& hbrequest = noi->getRequest(); + + // Skip responding to node2's heartbeat request, so that it stays in SENT state. + if (hbrequest.target == HostAndPort("node2", 12345)) { + getNet()->blackHole(noi); + continue; + } + + // Respond to node3's heartbeat request so that we schedule a new heartbeat request that + // stays in SCHEDULED state. + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName("mySet"); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + // The smallest valid optime in PV1. + OpTime opTime(Timestamp(), 0); + hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t()}); + hbResp.setDurableOpTimeAndWallTime({opTime, Date_t()}); + BSONObjBuilder responseBuilder; + responseBuilder << "ok" << 1; + hbResp.addToBSON(&responseBuilder); + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseBuilder.obj())); + + getNet()->runReadyNetworkOperations(); } ASSERT_FALSE(getNet()->hasReadyRequests()); exitNetwork(); + // Receive a request from node3 saying it's the primary, so that we restart scheduled + // heartbeats. + receiveHeartbeatFrom(rsConfig, + 3 /* senderId */, + HostAndPort("node3", 12345), + 1 /* term */, + 3 /* currentPrimaryId */); + + enterNetwork(); + + // Verify that only node3's heartbeat request was cancelled. + ASSERT_TRUE(getNet()->hasReadyRequests()); + const auto noi = getNet()->getNextReadyRequest(); + // 'request' represents the request sent from self(node1) back to node3. + const RemoteCommandRequest& request = noi->getRequest(); + ReplSetHeartbeatArgsV1 args; + ASSERT_OK(args.initialize(request.cmdObj)); + ASSERT_EQ(request.target, HostAndPort("node3", 12345)); + ASSERT_EQ(args.getPrimaryId(), -1); + // We don't need to respond. + getNet()->blackHole(noi); + + // The heartbeat request for node2 should not have been cancelled, so there should not be any + // more network ready requests. + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); +} + +TEST_F(ReplCoordHBV1Test, + SecondaryReceivesHeartbeatRequestFromPrimaryWithDifferentPrimaryIdRestartsHeartbeats) { + auto replAllSeverityGuard = unittest::MinimumLoggedSeverityGuard{ + logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(3)}; + + auto replConfigBson = BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + + assertStartSuccess(replConfigBson, HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + getReplCoord()->updateTerm_forTest(1, nullptr); + ASSERT_EQ(getReplCoord()->getTerm(), 1); + + auto rsConfig = getReplCoord()->getConfig(); + + for (int j = 0; j < 2; ++j) { + // Respond to the initial heartbeat request so that we schedule a new heartbeat request that + // stays in SCHEDULED state. + replyToReceivedHeartbeatV1(); + } + + // Verify that there are no further heartbeat requests, since the heartbeat requests should be + // scheduled for the future. + enterNetwork(); + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); + // We're a secondary and we receive a request from node3 saying it's the primary. - receiveHeartbeatFrom(getReplCoord()->getConfig(), + receiveHeartbeatFrom(rsConfig, 3 /* senderId */, HostAndPort("node3", 12345), 1 /* term */, @@ -262,7 +347,7 @@ TEST_F(ReplCoordHBV1Test, exitNetwork(); // Heartbeat in a stale term shouldn't re-schedule heartbeats. - receiveHeartbeatFrom(getReplCoord()->getConfig(), + receiveHeartbeatFrom(rsConfig, 3 /* senderId */, HostAndPort("node3", 12345), 0 /* term */, diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 3ca889fe529..bb2a614ffa4 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -1194,11 +1194,6 @@ std::pair<MemberId, Date_t> TopologyCoordinator::getStalestLiveMember() const { return std::make_pair(earliestMemberId, earliestDate); } -void TopologyCoordinator::resetAllMemberTimeouts(Date_t now) { - for (auto&& memberData : _memberData) - memberData.updateLiveness(now); -} - void TopologyCoordinator::resetMemberTimeouts(Date_t now, const stdx::unordered_set<HostAndPort>& member_set) { for (auto&& memberData : _memberData) { @@ -3167,9 +3162,13 @@ void TopologyCoordinator::setStorageEngineSupportsReadCommitted(bool supported) supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; } -void TopologyCoordinator::restartHeartbeats() { - for (auto& hb : _memberData) { - hb.restart(); +void TopologyCoordinator::restartHeartbeat(const Date_t now, const HostAndPort& target) { + for (auto&& member : _memberData) { + if (member.getHostAndPort() == target) { + member.restart(); + member.updateLiveness(now); + return; + } } } diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index fa443aa6f07..f8196b690cf 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -487,11 +487,6 @@ public: HeartbeatResponseAction checkMemberTimeouts(Date_t now); /** - * Set all nodes in memberData to not stale with a lastUpdate of "now". - */ - void resetAllMemberTimeouts(Date_t now); - - /** * Set all nodes in memberData that are present in member_set * to not stale with a lastUpdate of "now". */ @@ -717,9 +712,9 @@ public: void setStorageEngineSupportsReadCommitted(bool supported); /** - * Reset the booleans to record the last heartbeat restart. + * Reset the booleans to record the last heartbeat restart for the target node. */ - void restartHeartbeats(); + void restartHeartbeat(const Date_t now, const HostAndPort& target); /** * Increments the counter field of the current TopologyVersion. |