diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 89 |
1 files changed, 59 insertions, 30 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index b1cc8195122..1b8912c3480 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -93,8 +93,7 @@ Milliseconds ReplicationCoordinatorImpl::_getRandomizedElectionOffset_inlock() { } void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, - const HostAndPort& target, - int targetIndex) { + const HostAndPort& target) { stdx::lock_guard<Latch> lk(_mutex); _untrackHeartbeatHandle_inlock(cbData.myHandle); @@ -114,7 +113,7 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::Call target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout); const executor::TaskExecutor::RemoteCommandCallbackFn callback = [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - return _handleHeartbeatResponse(cbData, targetIndex); + return _handleHeartbeatResponse(cbData); }; LOGV2_FOR_HEARTBEATS(4615670, @@ -124,11 +123,11 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::Call "requestId"_attr = request.id, "target"_attr = target, "heartbeatObj"_attr = heartbeatObj); - _trackHeartbeatHandle_inlock(_replExecutor->scheduleRemoteCommand(request, callback)); + _trackHeartbeatHandle_inlock( + _replExecutor->scheduleRemoteCommand(request, callback), HeartbeatState::kSent, target); } void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAndPort& target, - int targetIndex, Date_t when) { LOGV2_FOR_HEARTBEATS(4615618, 2, @@ -136,10 +135,13 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAnd "Scheduling heartbeat", "target"_attr = target, "when"_attr = when); - _trackHeartbeatHandle_inlock(_replExecutor->scheduleWorkAt( - when, [=](const executor::TaskExecutor::CallbackArgs& cbData) { - _doMemberHeartbeat(cbData, target, targetIndex); - })); + _trackHeartbeatHandle_inlock( + _replExecutor->scheduleWorkAt(when, + [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _doMemberHeartbeat(cbData, target); + }), + HeartbeatState::kScheduled, + target); } void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj response, @@ -160,14 +162,14 @@ void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj respons _replExecutor->now(), _rsConfig.getReplSetName(), request.target); // Pretend we sent a request so that _untrackHeartbeatHandle_inlock succeeds. - _trackHeartbeatHandle_inlock(handle); + _trackHeartbeatHandle_inlock(handle, HeartbeatState::kSent, request.target); } - _handleHeartbeatResponse(cbData, targetIndex); + _handleHeartbeatResponse(cbData); } void ReplicationCoordinatorImpl::_handleHeartbeatResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) { + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { stdx::unique_lock<Latch> lk(_mutex); // remove handle from queued heartbeats @@ -345,8 +347,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( } } - _scheduleHeartbeatToTarget_inlock( - target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); + _scheduleHeartbeatToTarget_inlock(target, std::max(now, action.getNextHeartbeatStartDate())); _handleHeartbeatResponseAction_inlock(action, hbStatusResponse, std::move(lk)); } @@ -857,18 +858,26 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( } void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock( - const StatusWith<executor::TaskExecutor::CallbackHandle>& handle) { + const StatusWith<executor::TaskExecutor::CallbackHandle>& handle, + HeartbeatState hbState, + const HostAndPort& target) { if (handle.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(18912, handle.getStatus()); - _heartbeatHandles.push_back(handle.getValue()); + + // The target's HostAndPort should be safe to store, because it cannot change without a + // reconfig. On reconfig, all current heartbeats get cancelled and new requests are sent out, so + // there should not be a situation where the target node's HostAndPort changes but this + // heartbeat handle remains active. + _heartbeatHandles.push_back({handle.getValue(), hbState, target}); } void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock( const executor::TaskExecutor::CallbackHandle& handle) { - const HeartbeatHandles::iterator newEnd = - std::remove(_heartbeatHandles.begin(), _heartbeatHandles.end(), handle); + const auto newEnd = std::remove_if(_heartbeatHandles.begin(), + _heartbeatHandles.end(), + [&](auto& hbHandle) { return hbHandle.handle == handle; }); invariant(newEnd != _heartbeatHandles.end()); _heartbeatHandles.erase(newEnd, _heartbeatHandles.end()); } @@ -876,8 +885,8 @@ void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock( void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { LOGV2_FOR_HEARTBEATS(4615630, 2, "Cancelling all heartbeats"); - for (const auto& handle : _heartbeatHandles) { - _replExecutor->cancel(handle); + for (const auto& hbHandle : _heartbeatHandles) { + _replExecutor->cancel(hbHandle.handle); } // Heartbeat callbacks will remove themselves from _heartbeatHandles when they execute with // CallbackCanceled status, so it's better to leave the handles in the list, for now. @@ -887,31 +896,51 @@ void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { } } -void ReplicationCoordinatorImpl::restartHeartbeats_forTest() { +void ReplicationCoordinatorImpl::restartScheduledHeartbeats_forTest() { stdx::unique_lock<Latch> lk(_mutex); invariant(getTestCommandsEnabled()); - LOGV2_FOR_HEARTBEATS(4406800, 0, "Restarting heartbeats"); - _restartHeartbeats_inlock(); + _restartScheduledHeartbeats_inlock(); }; -void ReplicationCoordinatorImpl::_restartHeartbeats_inlock() { - _cancelHeartbeats_inlock(); - _startHeartbeats_inlock(); +void ReplicationCoordinatorImpl::_restartScheduledHeartbeats_inlock() { + LOGV2_FOR_HEARTBEATS(5031800, 2, "Restarting all scheduled heartbeats"); + + const Date_t now = _replExecutor->now(); + stdx::unordered_set<HostAndPort> restartedTargets; + + for (auto& hbHandle : _heartbeatHandles) { + // Only cancel heartbeats that are scheduled. If a heartbeat request has already been + // sent, we should wait for the response instead. + if (hbHandle.hbState != HeartbeatState::kScheduled) { + continue; + } + + LOGV2_FOR_HEARTBEATS(5031802, 2, "Restarting heartbeat", "target"_attr = hbHandle.target); + _replExecutor->cancel(hbHandle.handle); + + // Track the members that we have cancelled heartbeats. + restartedTargets.insert(hbHandle.target); + } + + for (auto target : restartedTargets) { + _scheduleHeartbeatToTarget_inlock(target, now); + _topCoord->restartHeartbeat(now, target); + } } void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { const Date_t now = _replExecutor->now(); _seedList.clear(); + for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { if (i == _selfIndex) { continue; } - _scheduleHeartbeatToTarget_inlock(_rsConfig.getMemberAt(i).getHostAndPort(), i, now); + auto target = _rsConfig.getMemberAt(i).getHostAndPort(); + _scheduleHeartbeatToTarget_inlock(target, now); + _topCoord->restartHeartbeat(now, target); } - _topCoord->restartHeartbeats(); - - _topCoord->resetAllMemberTimeouts(_replExecutor->now()); _scheduleNextLivenessUpdate_inlock(); } |