/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#define LOG_FOR_HEARTBEATS(level) \
MONGO_LOG_COMPONENT(level, ::mongo::logger::LogComponent::kReplicationHeartbeats)
#include "mongo/platform/basic.h"
#include
#include "mongo/base/status.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/elect_cmd_runner.h"
#include "mongo/db/repl/freshness_checker.h"
#include "mongo/db/repl/heartbeat_response_action.h"
#include "mongo/db/repl/member_data.h"
#include "mongo/db/repl/repl_set_config_checks.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/service_context.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/time_support.h"
namespace mongo {
namespace repl {
namespace {
MONGO_FP_DECLARE(blockHeartbeatStepdown);
MONGO_FP_DECLARE(blockHeartbeatReconfigFinish);
} // namespace
using executor::RemoteCommandRequest;
Milliseconds ReplicationCoordinatorImpl::_getRandomizedElectionOffset_inlock() {
long long electionTimeout = durationCount(_rsConfig.getElectionTimeoutPeriod());
long long randomOffsetUpperBound =
electionTimeout * _externalState->getElectionTimeoutOffsetLimitFraction();
// Avoid divide by zero error in random number generator.
if (randomOffsetUpperBound == 0) {
return Milliseconds(0);
}
return Milliseconds{_nextRandomInt64_inlock(randomOffsetUpperBound)};
}
void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData,
const HostAndPort& target,
int targetIndex) {
stdx::lock_guard lk(_mutex);
_untrackHeartbeatHandle_inlock(cbData.myHandle);
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
const Date_t now = _replExecutor->now();
BSONObj heartbeatObj;
Milliseconds timeout(0);
if (isV1ElectionProtocol()) {
const std::pair hbRequest =
_topCoord->prepareHeartbeatRequestV1(now, _settings.ourSetName(), target);
heartbeatObj = hbRequest.first.toBSON();
timeout = hbRequest.second;
} else {
const std::pair hbRequest =
_topCoord->prepareHeartbeatRequest(now, _settings.ourSetName(), target);
heartbeatObj = hbRequest.first.toBSON();
timeout = hbRequest.second;
}
const RemoteCommandRequest request(
target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout);
const executor::TaskExecutor::RemoteCommandCallbackFn callback =
stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse,
this,
stdx::placeholders::_1,
targetIndex);
LOG_FOR_HEARTBEATS(2) << "Sending heartbeat (requestId: " << request.id << ") to " << target
<< ", " << heartbeatObj;
_trackHeartbeatHandle_inlock(_replExecutor->scheduleRemoteCommand(request, callback));
}
void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAndPort& target,
int targetIndex,
Date_t when) {
LOG_FOR_HEARTBEATS(2) << "Scheduling heartbeat to " << target << " at "
<< dateToISOStringUTC(when);
_trackHeartbeatHandle_inlock(
_replExecutor->scheduleWorkAt(when,
stdx::bind(&ReplicationCoordinatorImpl::_doMemberHeartbeat,
this,
stdx::placeholders::_1,
target,
targetIndex)));
}
void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) {
stdx::unique_lock lk(_mutex);
// remove handle from queued heartbeats
_untrackHeartbeatHandle_inlock(cbData.myHandle);
// Parse and validate the response. At the end of this step, if responseStatus is OK then
// hbResponse is valid.
Status responseStatus = cbData.response.status;
const HostAndPort& target = cbData.request.target;
if (responseStatus == ErrorCodes::CallbackCanceled) {
LOG_FOR_HEARTBEATS(2) << "Received response to heartbeat (requestId: " << cbData.request.id
<< ") from " << target << " but the heartbeat was cancelled.";
return;
}
ReplSetHeartbeatResponse hbResponse;
BSONObj resp;
if (responseStatus.isOK()) {
resp = cbData.response.data;
responseStatus = hbResponse.initialize(resp, _topCoord->getTerm());
StatusWith replMetadata =
rpc::ReplSetMetadata::readFromMetadata(cbData.response.metadata);
LOG_FOR_HEARTBEATS(2) << "Received response to heartbeat (requestId: " << cbData.request.id
<< ") from " << target << ", " << resp;
// Reject heartbeat responses (and metadata) from nodes with mismatched replica set IDs.
// It is problematic to perform this check in the heartbeat reconfiguring logic because it
// is possible for two mismatched replica sets to have the same replica set name and
// configuration version. A heartbeat reconfiguration would not take place in that case.
// Additionally, this is where we would stop further processing of the metadata from an
// unknown replica set.
if (replMetadata.isOK() && _rsConfig.isInitialized() && _rsConfig.hasReplicaSetId() &&
replMetadata.getValue().getReplicaSetId().isSet() &&
_rsConfig.getReplicaSetId() != replMetadata.getValue().getReplicaSetId()) {
responseStatus = Status(ErrorCodes::InvalidReplicaSetConfig,
str::stream() << "replica set IDs do not match, ours: "
<< _rsConfig.getReplicaSetId()
<< "; remote node's: "
<< replMetadata.getValue().getReplicaSetId());
// Ignore metadata.
replMetadata = responseStatus;
}
if (replMetadata.isOK()) {
// Arbiters are the only nodes allowed to advance their commit point via heartbeats.
if (_getMemberState_inlock().arbiter()) {
_advanceCommitPoint_inlock(replMetadata.getValue().getLastOpCommitted());
}
// Asynchronous stepdown could happen, but it will wait for _mutex and execute
// after this function, so we cannot and don't need to wait for it to finish.
_processReplSetMetadata_inlock(replMetadata.getValue());
}
}
const Date_t now = _replExecutor->now();
Milliseconds networkTime(0);
StatusWith hbStatusResponse(hbResponse);
if (responseStatus.isOK()) {
networkTime = cbData.response.elapsedMillis.value_or(Milliseconds{0});
// TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this
// and update tests.
const auto& hbResponse = hbStatusResponse.getValue();
_updateTerm_inlock(hbResponse.getTerm());
// Postpone election timeout if we have a successful heartbeat response from the primary.
if (hbResponse.hasState() && hbResponse.getState().primary() &&
hbResponse.getTerm() == _topCoord->getTerm()) {
_cancelAndRescheduleElectionTimeout_inlock();
}
} else {
LOG_FOR_HEARTBEATS(0) << "Error in heartbeat (requestId: " << cbData.request.id << ") to "
<< target << ", response status: " << responseStatus;
hbStatusResponse = StatusWith(responseStatus);
}
HeartbeatResponseAction action =
_topCoord->processHeartbeatResponse(now, networkTime, target, hbStatusResponse);
if (action.getAction() == HeartbeatResponseAction::NoAction && hbStatusResponse.isOK() &&
hbStatusResponse.getValue().hasState() &&
hbStatusResponse.getValue().getState() != MemberState::RS_PRIMARY &&
action.getAdvancedOpTime()) {
_updateLastCommittedOpTime_inlock();
}
// Wake the stepdown waiter when our updated OpTime allows it to finish stepping down.
_signalStepDownWaiter_inlock();
// Abort catchup if we have caught up to the latest known optime after heartbeat refreshing.
if (_catchupState) {
_catchupState->signalHeartbeatUpdate_inlock();
}
_scheduleHeartbeatToTarget_inlock(
target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
_handleHeartbeatResponseAction_inlock(action, hbStatusResponse, std::move(lk));
}
stdx::unique_lock ReplicationCoordinatorImpl::_handleHeartbeatResponseAction_inlock(
const HeartbeatResponseAction& action,
const StatusWith& responseStatus,
stdx::unique_lock lock) {
invariant(lock.owns_lock());
switch (action.getAction()) {
case HeartbeatResponseAction::NoAction:
// Update the cached member state if different than the current topology member state
if (_memberState != _topCoord->getMemberState()) {
const PostMemberStateUpdateAction postUpdateAction =
_updateMemberStateFromTopologyCoordinator_inlock();
lock.unlock();
_performPostMemberStateUpdateAction(postUpdateAction);
lock.lock();
}
break;
case HeartbeatResponseAction::Reconfig:
invariant(responseStatus.isOK());
_scheduleHeartbeatReconfig_inlock(responseStatus.getValue().getConfig());
break;
case HeartbeatResponseAction::StartElection:
_startElectSelf_inlock();
break;
case HeartbeatResponseAction::StepDownSelf:
invariant(action.getPrimaryConfigIndex() == _selfIndex);
log() << "Stepping down from primary in response to heartbeat";
_topCoord->prepareForStepDown();
// Don't need to wait for stepdown to finish.
_stepDownStart();
break;
case HeartbeatResponseAction::StepDownRemotePrimary: {
invariant(action.getPrimaryConfigIndex() != _selfIndex);
_requestRemotePrimaryStepdown(
_rsConfig.getMemberAt(action.getPrimaryConfigIndex()).getHostAndPort());
break;
}
case HeartbeatResponseAction::PriorityTakeover: {
// Don't schedule a priority takeover if any takeover is already scheduled.
if (!_priorityTakeoverCbh.isValid() && !_catchupTakeoverCbh.isValid()) {
// Add randomized offset to calculated priority takeover delay.
Milliseconds priorityTakeoverDelay = _rsConfig.getPriorityTakeoverDelay(_selfIndex);
Milliseconds randomOffset = _getRandomizedElectionOffset_inlock();
_priorityTakeoverWhen = _replExecutor->now() + priorityTakeoverDelay + randomOffset;
log() << "Scheduling priority takeover at " << _priorityTakeoverWhen;
_priorityTakeoverCbh = _scheduleWorkAt(
_priorityTakeoverWhen,
stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1,
this,
StartElectionV1Reason::kPriorityTakeover));
}
break;
}
case HeartbeatResponseAction::CatchupTakeover: {
// Don't schedule a catchup takeover if any takeover is already scheduled.
if (!_catchupTakeoverCbh.isValid() && !_priorityTakeoverCbh.isValid()) {
Milliseconds catchupTakeoverDelay = _rsConfig.getCatchupTakeoverDelay();
_catchupTakeoverWhen = _replExecutor->now() + catchupTakeoverDelay;
log() << "Scheduling catchup takeover at " << _catchupTakeoverWhen;
_catchupTakeoverCbh = _scheduleWorkAt(
_catchupTakeoverWhen, [this](const executor::TaskExecutor::CallbackArgs& args) {
stdx::lock_guard lock(_mutex);
_cancelCatchupTakeover_inlock();
log() << "Starting an election for a catchup takeover [NOOP]";
});
}
break;
}
}
return lock;
}
namespace {
/**
* This callback is purely for logging and has no effect on any other operations
*/
void remoteStepdownCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
const Status status = cbData.response.status;
if (status == ErrorCodes::CallbackCanceled) {
return;
}
if (status.isOK()) {
LOG(1) << "stepdown of primary(" << cbData.request.target << ") succeeded with response -- "
<< cbData.response.data;
} else {
warning() << "stepdown of primary(" << cbData.request.target << ") failed due to "
<< cbData.response.status;
}
}
} // namespace
void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort& target) {
auto secondaryCatchUpPeriod(duration_cast(_rsConfig.getHeartbeatInterval() / 2));
RemoteCommandRequest request(
target,
"admin",
BSON("replSetStepDown" << 20 << "secondaryCatchUpPeriodSecs"
<< std::min(static_cast(secondaryCatchUpPeriod.count()),
20LL)),
nullptr);
log() << "Requesting " << target << " step down from primary";
auto cbh = _replExecutor->scheduleRemoteCommand(request, remoteStepdownCallback);
if (cbh.getStatus() != ErrorCodes::ShutdownInProgress) {
fassert(18808, cbh.getStatus());
}
}
executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() {
auto finishEvent = _makeEvent();
if (!finishEvent) {
return finishEvent;
}
_replExecutor
->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish,
this,
stdx::placeholders::_1,
finishEvent))
.status_with_transitional_ignore();
return finishEvent;
}
void ReplicationCoordinatorImpl::_stepDownFinish(
const executor::TaskExecutor::CallbackArgs& cbData,
const executor::TaskExecutor::EventHandle& finishedEvent) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
MONGO_FAIL_POINT_PAUSE_WHILE_SET(blockHeartbeatStepdown);
auto opCtx = cc().makeOperationContext();
Lock::GlobalWrite globalExclusiveLock(opCtx.get());
// TODO Add invariant that we've got global shared or global exclusive lock, when supported
// by lock manager.
stdx::unique_lock lk(_mutex);
if (_topCoord->stepDownIfPending()) {
const auto action = _updateMemberStateFromTopologyCoordinator_inlock();
lk.unlock();
_performPostMemberStateUpdateAction(action);
}
_replExecutor->signalEvent(finishedEvent);
}
void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig) {
if (_inShutdown) {
return;
}
switch (_rsConfigState) {
case kConfigUninitialized:
case kConfigSteady:
LOG_FOR_HEARTBEATS(1) << "Received new config via heartbeat with version "
<< newConfig.getConfigVersion();
break;
case kConfigInitiating:
case kConfigReconfiguring:
case kConfigHBReconfiguring:
LOG_FOR_HEARTBEATS(1) << "Ignoring new configuration with version "
<< newConfig.getConfigVersion()
<< " because already in the midst of a configuration process.";
return;
case kConfigPreStart:
case kConfigStartingUp:
case kConfigReplicationDisabled:
severe() << "Reconfiguration request occurred while _rsConfigState == "
<< int(_rsConfigState) << "; aborting.";
fassertFailed(18807);
}
_setConfigState_inlock(kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersion() < newConfig.getConfigVersion());
if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
LOG_FOR_HEARTBEATS(2) << "Rescheduling heartbeat reconfig to version "
<< newConfig.getConfigVersion()
<< " to be processed after election is cancelled.";
_replExecutor
->onEvent(electionFinishedEvent,
stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore,
this,
stdx::placeholders::_1,
newConfig))
.status_with_transitional_ignore();
return;
}
_replExecutor
->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore,
this,
stdx::placeholders::_1,
newConfig))
.status_with_transitional_ignore();
}
void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) {
if (cbd.status.code() == ErrorCodes::CallbackCanceled) {
log() << "The callback to persist the replica set configuration was canceled - "
<< "the configuration was not persisted but was used: " << newConfig.toBSON();
return;
}
const StatusWith myIndex = validateConfigForHeartbeatReconfig(
_externalState.get(), newConfig, getGlobalServiceContext());
if (myIndex.getStatus() == ErrorCodes::NodeNotFound) {
stdx::lock_guard lk(_mutex);
// If this node absent in newConfig, and this node was not previously initialized,
// return to kConfigUninitialized immediately, rather than storing the config and
// transitioning into the RS_REMOVED state. See SERVER-15740.
if (!_rsConfig.isInitialized()) {
invariant(_rsConfigState == kConfigHBReconfiguring);
LOG_FOR_HEARTBEATS(1) << "Ignoring new configuration in heartbeat response because we "
"are uninitialized and not a member of the new configuration";
_setConfigState_inlock(kConfigUninitialized);
return;
}
}
if (!myIndex.getStatus().isOK() && myIndex.getStatus() != ErrorCodes::NodeNotFound) {
warning() << "Not persisting new configuration in heartbeat response to disk because "
"it is invalid: "
<< myIndex.getStatus();
} else {
LOG_FOR_HEARTBEATS(2) << "Config with version " << newConfig.getConfigVersion()
<< " validated for reconfig; persisting to disk.";
auto opCtx = cc().makeOperationContext();
auto status = _externalState->storeLocalConfigDocument(opCtx.get(), newConfig.toBSON());
bool isFirstConfig;
{
stdx::lock_guard lk(_mutex);
isFirstConfig = !_rsConfig.isInitialized();
if (!status.isOK()) {
error() << "Ignoring new configuration in heartbeat response because we failed to"
" write it to stable storage; "
<< status;
invariant(_rsConfigState == kConfigHBReconfiguring);
if (isFirstConfig) {
_setConfigState_inlock(kConfigUninitialized);
} else {
_setConfigState_inlock(kConfigSteady);
}
return;
}
}
bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 &&
newConfig.getMemberAt(myIndex.getValue()).isArbiter();
if (!isArbiter && isFirstConfig) {
_externalState->startThreads(_settings);
_startDataReplication(opCtx.get());
}
}
LOG_FOR_HEARTBEATS(2) << "New configuration with version " << newConfig.getConfigVersion()
<< " persisted to local storage; installing new config in memory";
_heartbeatReconfigFinish(cbd, newConfig, myIndex);
}
void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
const executor::TaskExecutor::CallbackArgs& cbData,
const ReplSetConfig& newConfig,
StatusWith myIndex) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
if (MONGO_FAIL_POINT(blockHeartbeatReconfigFinish)) {
LOG_FOR_HEARTBEATS(0) << "blockHeartbeatReconfigFinish fail point enabled. Rescheduling "
"_heartbeatReconfigFinish until fail point is disabled.";
_replExecutor
->scheduleWorkAt(_replExecutor->now() + Milliseconds{10},
stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish,
this,
stdx::placeholders::_1,
newConfig,
myIndex))
.status_with_transitional_ignore();
return;
}
auto opCtx = cc().makeOperationContext();
boost::optional globalExclusiveLock;
stdx::unique_lock lk{_mutex};
if (_memberState.primary()) {
// If we are primary, we need the global lock in MODE_X to step down. If we somehow
// transition out of primary while waiting for the global lock, there's no harm in holding
// it.
lk.unlock();
globalExclusiveLock.emplace(opCtx.get());
lk.lock();
}
invariant(_rsConfigState == kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersion() < newConfig.getConfigVersion());
// Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
LOG_FOR_HEARTBEATS(0)
<< "Waiting for election to complete before finishing reconfig to version "
<< newConfig.getConfigVersion();
// Wait for the election to complete and the node's Role to be set to follower.
_replExecutor
->onEvent(electionFinishedEvent,
stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish,
this,
stdx::placeholders::_1,
newConfig,
myIndex))
.status_with_transitional_ignore();
return;
}
if (!myIndex.isOK()) {
switch (myIndex.getStatus().code()) {
case ErrorCodes::NodeNotFound:
log() << "Cannot find self in new replica set configuration; I must be removed; "
<< myIndex.getStatus();
break;
case ErrorCodes::DuplicateKey:
error() << "Several entries in new config represent this node; "
"Removing self until an acceptable configuration arrives; "
<< myIndex.getStatus();
break;
default:
error() << "Could not validate configuration received from remote node; "
"Removing self until an acceptable configuration arrives; "
<< myIndex.getStatus();
break;
}
myIndex = StatusWith(-1);
}
const ReplSetConfig oldConfig = _rsConfig;
// If we do not have an index, we should pass -1 as our index to avoid falsely adding ourself to
// the data structures inside of the TopologyCoordinator.
const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1;
const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndexValue);
lk.unlock();
_resetElectionInfoOnProtocolVersionUpgrade(opCtx.get(), oldConfig, newConfig);
_performPostMemberStateUpdateAction(action);
}
void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock(
const StatusWith& handle) {
if (handle.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18912, handle.getStatus());
_heartbeatHandles.push_back(handle.getValue());
}
void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock(
const executor::TaskExecutor::CallbackHandle& handle) {
const HeartbeatHandles::iterator newEnd =
std::remove(_heartbeatHandles.begin(), _heartbeatHandles.end(), handle);
invariant(newEnd != _heartbeatHandles.end());
_heartbeatHandles.erase(newEnd, _heartbeatHandles.end());
}
void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() {
LOG_FOR_HEARTBEATS(2) << "Cancelling all heartbeats.";
std::for_each(
_heartbeatHandles.begin(),
_heartbeatHandles.end(),
stdx::bind(&executor::TaskExecutor::cancel, _replExecutor.get(), stdx::placeholders::_1));
// Heartbeat callbacks will remove themselves from _heartbeatHandles when they execute with
// CallbackCanceled status, so it's better to leave the handles in the list, for now.
if (_handleLivenessTimeoutCbh.isValid()) {
_replExecutor->cancel(_handleLivenessTimeoutCbh);
}
}
void ReplicationCoordinatorImpl::_restartHeartbeats_inlock() {
_cancelHeartbeats_inlock();
_startHeartbeats_inlock();
}
void ReplicationCoordinatorImpl::_startHeartbeats_inlock() {
const Date_t now = _replExecutor->now();
_seedList.clear();
for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
if (i == _selfIndex) {
continue;
}
_scheduleHeartbeatToTarget_inlock(_rsConfig.getMemberAt(i).getHostAndPort(), i, now);
}
_topCoord->restartHeartbeats();
if (isV1ElectionProtocol()) {
_topCoord->resetAllMemberTimeouts(_replExecutor->now());
_scheduleNextLivenessUpdate_inlock();
}
}
void ReplicationCoordinatorImpl::_handleLivenessTimeout(
const executor::TaskExecutor::CallbackArgs& cbData) {
stdx::unique_lock lk(_mutex);
// Only reset the callback handle if it matches, otherwise more will be coming through
if (cbData.myHandle == _handleLivenessTimeoutCbh) {
_handleLivenessTimeoutCbh = CallbackHandle();
}
if (!cbData.status.isOK()) {
return;
}
if (!isV1ElectionProtocol()) {
return;
}
// Scan liveness table for problems and mark nodes as down by calling into topocoord.
HeartbeatResponseAction action = _topCoord->checkMemberTimeouts(_replExecutor->now());
// Don't mind potential asynchronous stepdown as this is the last step of
// liveness check.
lk = _handleHeartbeatResponseAction_inlock(
action, makeStatusWith(), std::move(lk));
_scheduleNextLivenessUpdate_inlock();
}
void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock() {
if (!isV1ElectionProtocol()) {
return;
}
// Scan liveness table for earliest date; schedule a run at (that date plus election
// timeout).
Date_t earliestDate;
int earliestMemberId;
std::tie(earliestMemberId, earliestDate) = _topCoord->getStalestLiveMember();
if (earliestMemberId == -1 || earliestDate == Date_t::max()) {
_earliestMemberId = -1;
// Nobody here but us.
return;
}
if (_handleLivenessTimeoutCbh.isValid() && !_handleLivenessTimeoutCbh.isCanceled()) {
// don't bother to schedule; one is already scheduled and pending.
return;
}
auto nextTimeout = earliestDate + _rsConfig.getElectionTimeoutPeriod();
if (nextTimeout > _replExecutor->now()) {
LOG(3) << "scheduling next check at " << nextTimeout;
auto cbh = _scheduleWorkAt(nextTimeout,
stdx::bind(&ReplicationCoordinatorImpl::_handleLivenessTimeout,
this,
stdx::placeholders::_1));
if (!cbh) {
return;
}
_handleLivenessTimeoutCbh = cbh;
_earliestMemberId = earliestMemberId;
}
}
void ReplicationCoordinatorImpl::_cancelAndRescheduleLivenessUpdate_inlock(int updatedMemberId) {
if ((_earliestMemberId != -1) && (_earliestMemberId != updatedMemberId)) {
return;
}
if (_handleLivenessTimeoutCbh.isValid()) {
_replExecutor->cancel(_handleLivenessTimeoutCbh);
}
_scheduleNextLivenessUpdate_inlock();
}
void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() {
if (_priorityTakeoverCbh.isValid()) {
log() << "Canceling priority takeover callback";
_replExecutor->cancel(_priorityTakeoverCbh);
_priorityTakeoverCbh = CallbackHandle();
_priorityTakeoverWhen = Date_t();
}
}
void ReplicationCoordinatorImpl::_cancelCatchupTakeover_inlock() {
if (_catchupTakeoverCbh.isValid()) {
log() << "Canceling catchup takeover callback";
_replExecutor->cancel(_catchupTakeoverCbh);
_catchupTakeoverCbh = CallbackHandle();
_catchupTakeoverWhen = Date_t();
}
}
void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() {
if (_handleElectionTimeoutCbh.isValid()) {
LOG(4) << "Canceling election timeout callback at " << _handleElectionTimeoutWhen;
_replExecutor->cancel(_handleElectionTimeoutCbh);
_handleElectionTimeoutCbh = CallbackHandle();
_handleElectionTimeoutWhen = Date_t();
}
if (_inShutdown) {
return;
}
if (!isV1ElectionProtocol()) {
return;
}
if (!_memberState.secondary()) {
return;
}
if (_selfIndex < 0) {
return;
}
if (!_rsConfig.getMemberAt(_selfIndex).isElectable()) {
return;
}
Milliseconds randomOffset = _getRandomizedElectionOffset_inlock();
auto now = _replExecutor->now();
auto when = now + _rsConfig.getElectionTimeoutPeriod() + randomOffset;
invariant(when > now);
LOG(4) << "Scheduling election timeout callback at " << when;
_handleElectionTimeoutWhen = when;
_handleElectionTimeoutCbh =
_scheduleWorkAt(when,
stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1,
this,
StartElectionV1Reason::kElectionTimeout));
}
void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(StartElectionV1Reason reason) {
if (!isV1ElectionProtocol()) {
return;
}
stdx::lock_guard lock(_mutex);
// We should always reschedule this callback even if we do not make it to the election
// process.
{
_cancelCatchupTakeover_inlock();
_cancelPriorityTakeover_inlock();
_cancelAndRescheduleElectionTimeout_inlock();
if (_inShutdown) {
log() << "Not starting an election, since we are shutting down";
return;
}
}
const auto status = _topCoord->becomeCandidateIfElectable(
_replExecutor->now(), reason == StartElectionV1Reason::kPriorityTakeover);
if (!status.isOK()) {
switch (reason) {
case StartElectionV1Reason::kElectionTimeout:
log() << "Not starting an election, since we are not electable due to: "
<< status.reason();
break;
case StartElectionV1Reason::kPriorityTakeover:
log() << "Not starting an election for a priority takeover, "
<< "since we are not electable due to: " << status.reason();
break;
case StartElectionV1Reason::kStepUpRequest:
log() << "Not starting an election for a replSetStepUp request, "
<< "since we are not electable due to: " << status.reason();
break;
}
return;
}
switch (reason) {
case StartElectionV1Reason::kElectionTimeout:
log() << "Starting an election, since we've seen no PRIMARY in the past "
<< _rsConfig.getElectionTimeoutPeriod();
break;
case StartElectionV1Reason::kPriorityTakeover:
log() << "Starting an election for a priority takeover";
break;
case StartElectionV1Reason::kStepUpRequest:
log() << "Starting an election due to step up request";
break;
}
_startElectSelfV1_inlock();
}
} // namespace repl
} // namespace mongo