summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp89
1 files changed, 59 insertions, 30 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index b1cc8195122..1b8912c3480 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -93,8 +93,7 @@ Milliseconds ReplicationCoordinatorImpl::_getRandomizedElectionOffset_inlock() {
}
void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData,
- const HostAndPort& target,
- int targetIndex) {
+ const HostAndPort& target) {
stdx::lock_guard<Latch> lk(_mutex);
_untrackHeartbeatHandle_inlock(cbData.myHandle);
@@ -114,7 +113,7 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::Call
target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout);
const executor::TaskExecutor::RemoteCommandCallbackFn callback =
[=](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- return _handleHeartbeatResponse(cbData, targetIndex);
+ return _handleHeartbeatResponse(cbData);
};
LOGV2_FOR_HEARTBEATS(4615670,
@@ -124,11 +123,11 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::Call
"requestId"_attr = request.id,
"target"_attr = target,
"heartbeatObj"_attr = heartbeatObj);
- _trackHeartbeatHandle_inlock(_replExecutor->scheduleRemoteCommand(request, callback));
+ _trackHeartbeatHandle_inlock(
+ _replExecutor->scheduleRemoteCommand(request, callback), HeartbeatState::kSent, target);
}
void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAndPort& target,
- int targetIndex,
Date_t when) {
LOGV2_FOR_HEARTBEATS(4615618,
2,
@@ -136,10 +135,13 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAnd
"Scheduling heartbeat",
"target"_attr = target,
"when"_attr = when);
- _trackHeartbeatHandle_inlock(_replExecutor->scheduleWorkAt(
- when, [=](const executor::TaskExecutor::CallbackArgs& cbData) {
- _doMemberHeartbeat(cbData, target, targetIndex);
- }));
+ _trackHeartbeatHandle_inlock(
+ _replExecutor->scheduleWorkAt(when,
+ [=](const executor::TaskExecutor::CallbackArgs& cbData) {
+ _doMemberHeartbeat(cbData, target);
+ }),
+ HeartbeatState::kScheduled,
+ target);
}
void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj response,
@@ -160,14 +162,14 @@ void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj respons
_replExecutor->now(), _rsConfig.getReplSetName(), request.target);
// Pretend we sent a request so that _untrackHeartbeatHandle_inlock succeeds.
- _trackHeartbeatHandle_inlock(handle);
+ _trackHeartbeatHandle_inlock(handle, HeartbeatState::kSent, request.target);
}
- _handleHeartbeatResponse(cbData, targetIndex);
+ _handleHeartbeatResponse(cbData);
}
void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) {
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
stdx::unique_lock<Latch> lk(_mutex);
// remove handle from queued heartbeats
@@ -345,8 +347,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
}
}
- _scheduleHeartbeatToTarget_inlock(
- target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
+ _scheduleHeartbeatToTarget_inlock(target, std::max(now, action.getNextHeartbeatStartDate()));
_handleHeartbeatResponseAction_inlock(action, hbStatusResponse, std::move(lk));
}
@@ -857,18 +858,26 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
}
void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock(
- const StatusWith<executor::TaskExecutor::CallbackHandle>& handle) {
+ const StatusWith<executor::TaskExecutor::CallbackHandle>& handle,
+ HeartbeatState hbState,
+ const HostAndPort& target) {
if (handle.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18912, handle.getStatus());
- _heartbeatHandles.push_back(handle.getValue());
+
+ // The target's HostAndPort should be safe to store, because it cannot change without a
+ // reconfig. On reconfig, all current heartbeats get cancelled and new requests are sent out, so
+ // there should not be a situation where the target node's HostAndPort changes but this
+ // heartbeat handle remains active.
+ _heartbeatHandles.push_back({handle.getValue(), hbState, target});
}
void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock(
const executor::TaskExecutor::CallbackHandle& handle) {
- const HeartbeatHandles::iterator newEnd =
- std::remove(_heartbeatHandles.begin(), _heartbeatHandles.end(), handle);
+ const auto newEnd = std::remove_if(_heartbeatHandles.begin(),
+ _heartbeatHandles.end(),
+ [&](auto& hbHandle) { return hbHandle.handle == handle; });
invariant(newEnd != _heartbeatHandles.end());
_heartbeatHandles.erase(newEnd, _heartbeatHandles.end());
}
@@ -876,8 +885,8 @@ void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock(
void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() {
LOGV2_FOR_HEARTBEATS(4615630, 2, "Cancelling all heartbeats");
- for (const auto& handle : _heartbeatHandles) {
- _replExecutor->cancel(handle);
+ for (const auto& hbHandle : _heartbeatHandles) {
+ _replExecutor->cancel(hbHandle.handle);
}
// Heartbeat callbacks will remove themselves from _heartbeatHandles when they execute with
// CallbackCanceled status, so it's better to leave the handles in the list, for now.
@@ -887,31 +896,51 @@ void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() {
}
}
-void ReplicationCoordinatorImpl::restartHeartbeats_forTest() {
+void ReplicationCoordinatorImpl::restartScheduledHeartbeats_forTest() {
stdx::unique_lock<Latch> lk(_mutex);
invariant(getTestCommandsEnabled());
- LOGV2_FOR_HEARTBEATS(4406800, 0, "Restarting heartbeats");
- _restartHeartbeats_inlock();
+ _restartScheduledHeartbeats_inlock();
};
-void ReplicationCoordinatorImpl::_restartHeartbeats_inlock() {
- _cancelHeartbeats_inlock();
- _startHeartbeats_inlock();
+void ReplicationCoordinatorImpl::_restartScheduledHeartbeats_inlock() {
+ LOGV2_FOR_HEARTBEATS(5031800, 2, "Restarting all scheduled heartbeats");
+
+ const Date_t now = _replExecutor->now();
+ stdx::unordered_set<HostAndPort> restartedTargets;
+
+ for (auto& hbHandle : _heartbeatHandles) {
+ // Only cancel heartbeats that are scheduled. If a heartbeat request has already been
+ // sent, we should wait for the response instead.
+ if (hbHandle.hbState != HeartbeatState::kScheduled) {
+ continue;
+ }
+
+ LOGV2_FOR_HEARTBEATS(5031802, 2, "Restarting heartbeat", "target"_attr = hbHandle.target);
+ _replExecutor->cancel(hbHandle.handle);
+
+ // Track the members that we have cancelled heartbeats.
+ restartedTargets.insert(hbHandle.target);
+ }
+
+ for (auto target : restartedTargets) {
+ _scheduleHeartbeatToTarget_inlock(target, now);
+ _topCoord->restartHeartbeat(now, target);
+ }
}
void ReplicationCoordinatorImpl::_startHeartbeats_inlock() {
const Date_t now = _replExecutor->now();
_seedList.clear();
+
for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
if (i == _selfIndex) {
continue;
}
- _scheduleHeartbeatToTarget_inlock(_rsConfig.getMemberAt(i).getHostAndPort(), i, now);
+ auto target = _rsConfig.getMemberAt(i).getHostAndPort();
+ _scheduleHeartbeatToTarget_inlock(target, now);
+ _topCoord->restartHeartbeat(now, target);
}
- _topCoord->restartHeartbeats();
-
- _topCoord->resetAllMemberTimeouts(_replExecutor->now());
_scheduleNextLivenessUpdate_inlock();
}