diff options
Diffstat (limited to 'src/mongo')
14 files changed, 192 insertions, 85 deletions
diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index a02a4d32ccb..e030bc22959 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -143,7 +143,7 @@ public: } return true; } else if (cmdObj.hasElement("restartHeartbeats")) { - replCoord->restartHeartbeats_forTest(); + replCoord->restartScheduledHeartbeats_forTest(); return true; } diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index e4f9db71181..e379f7fa114 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -1043,7 +1043,7 @@ public: /** * A testing only function that cancels and reschedules replication heartbeats immediately. */ - virtual void restartHeartbeats_forTest() = 0; + virtual void restartScheduledHeartbeats_forTest() = 0; protected: ReplicationCoordinator(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 766d69ffbc2..d2339838892 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -4137,7 +4137,7 @@ void ReplicationCoordinatorImpl::_postWonElectionUpdateMemberState(WithLock lk) // Clear the sync source. _onFollowerModeStateChange(); // Notify all secondaries of the election win. - _restartHeartbeats_inlock(); + _restartScheduledHeartbeats_inlock(); invariant(!_catchupState); _catchupState = std::make_unique<CatchupState>(this); _catchupState->start_inlock(); @@ -4711,7 +4711,7 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp // of other members's state, allowing us to make informed sync source decisions. if (newSyncSource.empty() && !oldSyncSource.empty() && _selfIndex >= 0 && !_getMemberState_inlock().primary()) { - _restartHeartbeats_inlock(); + _restartScheduledHeartbeats_inlock(); } return newSyncSource; @@ -5205,7 +5205,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs "Scheduling heartbeat to fetch a new config since we are not " "a member of our current config", "senderHost"_attr = senderHost); - _scheduleHeartbeatToTarget_inlock(senderHost, -1, now); + _scheduleHeartbeatToTarget_inlock(senderHost, now); } } else if (result.isOK() && response->getConfigVersionAndTerm() < args.getConfigVersionAndTerm()) { @@ -5230,8 +5230,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs // will trigger reconfig, which cancels and reschedules all heartbeats. else if (args.hasSender()) { LOGV2(21401, "Scheduling heartbeat to fetch a newer config", attr); - int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost); - _scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now); + _scheduleHeartbeatToTarget_inlock(senderHost, now); } } else if (result.isOK() && args.getPrimaryId() >= 0 && (!response->hasPrimaryId() || response->getPrimaryId() != args.getPrimaryId())) { @@ -5250,7 +5249,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs "myPrimaryId"_attr = myPrimaryId, "senderAndPrimaryId"_attr = args.getPrimaryId(), "senderTerm"_attr = args.getTerm()); - _restartHeartbeats_inlock(); + _restartScheduledHeartbeats_inlock(); } } return result; diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index dac623362a5..66556fac3ef 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -383,7 +383,7 @@ public: OnRemoteCmdScheduledFn onRemoteCmdScheduled, OnRemoteCmdCompleteFn onRemoteCmdComplete) override; - virtual void restartHeartbeats_forTest() override; + virtual void restartScheduledHeartbeats_forTest() override; // ================== Test support API =================== @@ -677,7 +677,12 @@ private: std::multimap<OpTime, SharedWaiterHandle> _waiters; }; - typedef std::vector<executor::TaskExecutor::CallbackHandle> HeartbeatHandles; + enum class HeartbeatState { kScheduled = 0, kSent = 1 }; + struct HeartbeatHandle { + executor::TaskExecutor::CallbackHandle handle; + HeartbeatState hbState; + HostAndPort target; + }; // The state and logic of primary catchup. // @@ -1010,22 +1015,21 @@ private: bool isRollbackAllowed); /** - * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index - * into the replica set config members array that corresponds to the "target", or -1 if - * "target" is not in _rsConfig. + * Schedules a heartbeat to be sent to "target" at "when". */ - void _scheduleHeartbeatToTarget_inlock(const HostAndPort& target, int targetIndex, Date_t when); + void _scheduleHeartbeatToTarget_inlock(const HostAndPort& target, Date_t when); /** * Processes each heartbeat response. * * Schedules additional heartbeats, triggers elections and step downs, etc. */ - void _handleHeartbeatResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - int targetIndex); + void _handleHeartbeatResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData); void _trackHeartbeatHandle_inlock( - const StatusWith<executor::TaskExecutor::CallbackHandle>& handle); + const StatusWith<executor::TaskExecutor::CallbackHandle>& handle, + HeartbeatState hbState, + const HostAndPort& target); void _untrackHeartbeatHandle_inlock(const executor::TaskExecutor::CallbackHandle& handle); @@ -1046,21 +1050,17 @@ private: void _cancelHeartbeats_inlock(); /** - * Cancels all heartbeats, then starts a heartbeat for each member in the current config. - * Called while holding replCoord _mutex. + * Cancels all heartbeats that have been scheduled but not yet sent out, then reschedules them + * at the current time immediately. Called while holding replCoord _mutex. */ - void _restartHeartbeats_inlock(); + void _restartScheduledHeartbeats_inlock(); /** - * Asynchronously sends a heartbeat to "target". "targetIndex" is the index - * into the replica set config members array that corresponds to the "target", or -1 if - * we don't have a valid replica set config. + * Asynchronously sends a heartbeat to "target". * * Scheduled by _scheduleHeartbeatToTarget_inlock. */ - void _doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, - const HostAndPort& target, - int targetIndex); + void _doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, const HostAndPort& target); MemberState _getMemberState_inlock() const; @@ -1496,7 +1496,7 @@ private: mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicationCoordinatorImpl::_mutex"); // (S) // Handles to actively queued heartbeats. - HeartbeatHandles _heartbeatHandles; // (M) + std::vector<HeartbeatHandle> _heartbeatHandles; // (M) // When this node does not know itself to be a member of a config, it adds // every host that sends it a heartbeat request to this set, and also starts diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 1f0684dc66f..c3fc3a33ef5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -93,8 +93,7 @@ Milliseconds ReplicationCoordinatorImpl::_getRandomizedElectionOffset_inlock() { } void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, - const HostAndPort& target, - int targetIndex) { + const HostAndPort& target) { stdx::lock_guard<Latch> lk(_mutex); _untrackHeartbeatHandle_inlock(cbData.myHandle); @@ -114,7 +113,7 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::Call target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout); const executor::TaskExecutor::RemoteCommandCallbackFn callback = [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - return _handleHeartbeatResponse(cbData, targetIndex); + return _handleHeartbeatResponse(cbData); }; LOGV2_FOR_HEARTBEATS(4615670, @@ -124,11 +123,11 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::Call "requestId"_attr = request.id, "target"_attr = target, "heartbeatObj"_attr = heartbeatObj); - _trackHeartbeatHandle_inlock(_replExecutor->scheduleRemoteCommand(request, callback)); + _trackHeartbeatHandle_inlock( + _replExecutor->scheduleRemoteCommand(request, callback), HeartbeatState::kSent, target); } void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAndPort& target, - int targetIndex, Date_t when) { LOGV2_FOR_HEARTBEATS(4615618, 2, @@ -136,10 +135,13 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAnd "Scheduling heartbeat", "target"_attr = target, "when"_attr = when); - _trackHeartbeatHandle_inlock(_replExecutor->scheduleWorkAt( - when, [=](const executor::TaskExecutor::CallbackArgs& cbData) { - _doMemberHeartbeat(cbData, target, targetIndex); - })); + _trackHeartbeatHandle_inlock( + _replExecutor->scheduleWorkAt(when, + [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _doMemberHeartbeat(cbData, target); + }), + HeartbeatState::kScheduled, + target); } void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj response, @@ -160,14 +162,14 @@ void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj respons _replExecutor->now(), _rsConfig.getReplSetName(), request.target); // Pretend we sent a request so that _untrackHeartbeatHandle_inlock succeeds. - _trackHeartbeatHandle_inlock(handle); + _trackHeartbeatHandle_inlock(handle, HeartbeatState::kSent, request.target); } - _handleHeartbeatResponse(cbData, targetIndex); + _handleHeartbeatResponse(cbData); } void ReplicationCoordinatorImpl::_handleHeartbeatResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) { + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { stdx::unique_lock<Latch> lk(_mutex); // remove handle from queued heartbeats @@ -345,8 +347,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( } } - _scheduleHeartbeatToTarget_inlock( - target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); + _scheduleHeartbeatToTarget_inlock(target, std::max(now, action.getNextHeartbeatStartDate())); _handleHeartbeatResponseAction_inlock(action, hbStatusResponse, std::move(lk)); } @@ -857,18 +858,26 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( } void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock( - const StatusWith<executor::TaskExecutor::CallbackHandle>& handle) { + const StatusWith<executor::TaskExecutor::CallbackHandle>& handle, + HeartbeatState hbState, + const HostAndPort& target) { if (handle.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(18912, handle.getStatus()); - _heartbeatHandles.push_back(handle.getValue()); + + // The target's HostAndPort should be safe to store, because it cannot change without a + // reconfig. On reconfig, all current heartbeats get cancelled and new requests are sent out, so + // there should not be a situation where the target node's HostAndPort changes but this + // heartbeat handle remains active. + _heartbeatHandles.push_back({handle.getValue(), hbState, target}); } void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock( const executor::TaskExecutor::CallbackHandle& handle) { - const HeartbeatHandles::iterator newEnd = - std::remove(_heartbeatHandles.begin(), _heartbeatHandles.end(), handle); + const auto newEnd = std::remove_if(_heartbeatHandles.begin(), + _heartbeatHandles.end(), + [&](auto& hbHandle) { return hbHandle.handle == handle; }); invariant(newEnd != _heartbeatHandles.end()); _heartbeatHandles.erase(newEnd, _heartbeatHandles.end()); } @@ -876,8 +885,8 @@ void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock( void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { LOGV2_FOR_HEARTBEATS(4615630, 2, "Cancelling all heartbeats"); - for (const auto& handle : _heartbeatHandles) { - _replExecutor->cancel(handle); + for (const auto& hbHandle : _heartbeatHandles) { + _replExecutor->cancel(hbHandle.handle); } // Heartbeat callbacks will remove themselves from _heartbeatHandles when they execute with // CallbackCanceled status, so it's better to leave the handles in the list, for now. @@ -887,31 +896,51 @@ void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { } } -void ReplicationCoordinatorImpl::restartHeartbeats_forTest() { +void ReplicationCoordinatorImpl::restartScheduledHeartbeats_forTest() { stdx::unique_lock<Latch> lk(_mutex); invariant(getTestCommandsEnabled()); - LOGV2_FOR_HEARTBEATS(4406800, 0, "Restarting heartbeats"); - _restartHeartbeats_inlock(); + _restartScheduledHeartbeats_inlock(); }; -void ReplicationCoordinatorImpl::_restartHeartbeats_inlock() { - _cancelHeartbeats_inlock(); - _startHeartbeats_inlock(); +void ReplicationCoordinatorImpl::_restartScheduledHeartbeats_inlock() { + LOGV2_FOR_HEARTBEATS(5031800, 2, "Restarting all scheduled heartbeats"); + + const Date_t now = _replExecutor->now(); + stdx::unordered_set<HostAndPort> restartedTargets; + + for (auto& hbHandle : _heartbeatHandles) { + // Only cancel heartbeats that are scheduled. If a heartbeat request has already been + // sent, we should wait for the response instead. + if (hbHandle.hbState != HeartbeatState::kScheduled) { + continue; + } + + LOGV2_FOR_HEARTBEATS(5031802, 2, "Restarting heartbeat", "target"_attr = hbHandle.target); + _replExecutor->cancel(hbHandle.handle); + + // Track the members that we have cancelled heartbeats. + restartedTargets.insert(hbHandle.target); + } + + for (auto target : restartedTargets) { + _scheduleHeartbeatToTarget_inlock(target, now); + _topCoord->restartHeartbeat(now, target); + } } void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { const Date_t now = _replExecutor->now(); _seedList.clear(); + for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { if (i == _selfIndex) { continue; } - _scheduleHeartbeatToTarget_inlock(_rsConfig.getMemberAt(i).getHostAndPort(), i, now); + auto target = _rsConfig.getMemberAt(i).getHostAndPort(); + _scheduleHeartbeatToTarget_inlock(target, now); + _topCoord->restartHeartbeat(now, target); } - _topCoord->restartHeartbeats(); - - _topCoord->resetAllMemberTimeouts(_replExecutor->now()); _scheduleNextLivenessUpdate_inlock(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index ebf5162a8c6..058a765d9bb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -186,8 +186,7 @@ TEST_F(ReplCoordHBV1Test, ASSERT_TRUE(getExternalState()->threadsStarted()); } -TEST_F(ReplCoordHBV1Test, - SecondaryReceivesHeartbeatRequestFromPrimaryWithDifferentPrimaryIdRestartsHeartbeats) { +TEST_F(ReplCoordHBV1Test, RestartingHeartbeatsShouldOnlyCancelScheduledHeartbeats) { auto replAllSeverityGuard = unittest::MinimumLoggedSeverityGuard{ logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(3)}; @@ -207,18 +206,104 @@ TEST_F(ReplCoordHBV1Test, getReplCoord()->updateTerm_forTest(1, nullptr); ASSERT_EQ(getReplCoord()->getTerm(), 1); + auto rsConfig = getReplCoord()->getConfig(); + enterNetwork(); - // Ignore the first 2 messages. for (int j = 0; j < 2; ++j) { const auto noi = getNet()->getNextReadyRequest(); - noi->getRequest(); - getNet()->blackHole(noi); + const RemoteCommandRequest& hbrequest = noi->getRequest(); + + // Skip responding to node2's heartbeat request, so that it stays in SENT state. + if (hbrequest.target == HostAndPort("node2", 12345)) { + getNet()->blackHole(noi); + continue; + } + + // Respond to node3's heartbeat request so that we schedule a new heartbeat request that + // stays in SCHEDULED state. + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName("mySet"); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + // The smallest valid optime in PV1. + OpTime opTime(Timestamp(), 0); + hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t()}); + hbResp.setDurableOpTimeAndWallTime({opTime, Date_t()}); + BSONObjBuilder responseBuilder; + responseBuilder << "ok" << 1; + hbResp.addToBSON(&responseBuilder); + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseBuilder.obj())); + + getNet()->runReadyNetworkOperations(); } ASSERT_FALSE(getNet()->hasReadyRequests()); exitNetwork(); + // Receive a request from node3 saying it's the primary, so that we restart scheduled + // heartbeats. + receiveHeartbeatFrom(rsConfig, + 3 /* senderId */, + HostAndPort("node3", 12345), + 1 /* term */, + 3 /* currentPrimaryId */); + + enterNetwork(); + + // Verify that only node3's heartbeat request was cancelled. + ASSERT_TRUE(getNet()->hasReadyRequests()); + const auto noi = getNet()->getNextReadyRequest(); + // 'request' represents the request sent from self(node1) back to node3. + const RemoteCommandRequest& request = noi->getRequest(); + ReplSetHeartbeatArgsV1 args; + ASSERT_OK(args.initialize(request.cmdObj)); + ASSERT_EQ(request.target, HostAndPort("node3", 12345)); + ASSERT_EQ(args.getPrimaryId(), -1); + // We don't need to respond. + getNet()->blackHole(noi); + + // The heartbeat request for node2 should not have been cancelled, so there should not be any + // more network ready requests. + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); +} + +TEST_F(ReplCoordHBV1Test, + SecondaryReceivesHeartbeatRequestFromPrimaryWithDifferentPrimaryIdRestartsHeartbeats) { + auto replAllSeverityGuard = unittest::MinimumLoggedSeverityGuard{ + logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(3)}; + + auto replConfigBson = BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + + assertStartSuccess(replConfigBson, HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + getReplCoord()->updateTerm_forTest(1, nullptr); + ASSERT_EQ(getReplCoord()->getTerm(), 1); + + auto rsConfig = getReplCoord()->getConfig(); + + for (int j = 0; j < 2; ++j) { + // Respond to the initial heartbeat request so that we schedule a new heartbeat request that + // stays in SCHEDULED state. + replyToReceivedHeartbeatV1(); + } + + // Verify that there are no further heartbeat requests, since the heartbeat requests should be + // scheduled for the future. + enterNetwork(); + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); + // We're a secondary and we receive a request from node3 saying it's the primary. - receiveHeartbeatFrom(getReplCoord()->getConfig(), + receiveHeartbeatFrom(rsConfig, 3 /* senderId */, HostAndPort("node3", 12345), 1 /* term */, @@ -243,7 +328,7 @@ TEST_F(ReplCoordHBV1Test, exitNetwork(); // Heartbeat in a stale term shouldn't re-schedule heartbeats. - receiveHeartbeatFrom(getReplCoord()->getConfig(), + receiveHeartbeatFrom(rsConfig, 3 /* senderId */, HostAndPort("node3", 12345), 0 /* term */, diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index f3f53672707..ced4341556c 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -678,7 +678,7 @@ BSONObj ReplicationCoordinatorMock::runCmdOnPrimaryAndAwaitResponse( OnRemoteCmdCompleteFn onRemoteCmdComplete) { return BSON("ok" << 1); } -void ReplicationCoordinatorMock::restartHeartbeats_forTest() { +void ReplicationCoordinatorMock::restartScheduledHeartbeats_forTest() { return; } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 13d85b97ed1..57dd218957a 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -362,7 +362,7 @@ public: const BSONObj& cmdObj, OnRemoteCmdScheduledFn onRemoteCmdScheduled, OnRemoteCmdCompleteFn onRemoteCmdComplete) override; - virtual void restartHeartbeats_forTest() override; + virtual void restartScheduledHeartbeats_forTest() override; private: ServiceContext* const _service; diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index ac3433eff02..30fba54905e 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -535,7 +535,7 @@ BSONObj ReplicationCoordinatorNoOp::runCmdOnPrimaryAndAwaitResponse( MONGO_UNREACHABLE; } -void ReplicationCoordinatorNoOp::restartHeartbeats_forTest() { +void ReplicationCoordinatorNoOp::restartScheduledHeartbeats_forTest() { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index 6cc54cb6664..66cc6927276 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -297,7 +297,7 @@ public: OnRemoteCmdScheduledFn onRemoteCmdScheduled, OnRemoteCmdCompleteFn onRemoteCmdComplete) override; - virtual void restartHeartbeats_forTest() final; + virtual void restartScheduledHeartbeats_forTest() final; private: ServiceContext* const _service; diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index c755830380a..181e9f5ddfc 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -1287,11 +1287,6 @@ std::pair<MemberId, Date_t> TopologyCoordinator::getStalestLiveMember() const { return std::make_pair(earliestMemberId, earliestDate); } -void TopologyCoordinator::resetAllMemberTimeouts(Date_t now) { - for (auto&& memberData : _memberData) - memberData.updateLiveness(now); -} - void TopologyCoordinator::resetMemberTimeouts(Date_t now, const stdx::unordered_set<HostAndPort>& member_set) { for (auto&& memberData : _memberData) { @@ -3300,9 +3295,13 @@ void TopologyCoordinator::setStorageEngineSupportsReadCommitted(bool supported) supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; } -void TopologyCoordinator::restartHeartbeats() { - for (auto& hb : _memberData) { - hb.restart(); +void TopologyCoordinator::restartHeartbeat(const Date_t now, const HostAndPort& target) { + for (auto&& member : _memberData) { + if (member.getHostAndPort() == target) { + member.restart(); + member.updateLiveness(now); + return; + } } } diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 5f7f9fbe2c1..cbde82f1834 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -524,11 +524,6 @@ public: HeartbeatResponseAction checkMemberTimeouts(Date_t now); /** - * Set all nodes in memberData to not stale with a lastUpdate of "now". - */ - void resetAllMemberTimeouts(Date_t now); - - /** * Set all nodes in memberData that are present in member_set * to not stale with a lastUpdate of "now". */ @@ -754,9 +749,9 @@ public: void setStorageEngineSupportsReadCommitted(bool supported); /** - * Reset the booleans to record the last heartbeat restart. + * Reset the booleans to record the last heartbeat restart for the target node. */ - void restartHeartbeats(); + void restartHeartbeat(const Date_t now, const HostAndPort& target); /** * Increments the counter field of the current TopologyVersion. diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp index e8e43f17bdb..b5912ea9ea1 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.cpp +++ b/src/mongo/embedded/replication_coordinator_embedded.cpp @@ -560,7 +560,7 @@ BSONObj ReplicationCoordinatorEmbedded::runCmdOnPrimaryAndAwaitResponse( MONGO_UNREACHABLE; } -void ReplicationCoordinatorEmbedded::restartHeartbeats_forTest() { +void ReplicationCoordinatorEmbedded::restartScheduledHeartbeats_forTest() { MONGO_UNREACHABLE; } diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h index 502333a0b6b..ae69921c944 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.h +++ b/src/mongo/embedded/replication_coordinator_embedded.h @@ -305,7 +305,7 @@ public: OnRemoteCmdScheduledFn onRemoteCmdScheduled, OnRemoteCmdCompleteFn onRemoteCmdComplete) final; - virtual void restartHeartbeats_forTest() override; + virtual void restartScheduledHeartbeats_forTest() override; private: // Back pointer to the ServiceContext that has started the instance. |