/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include #include "mongo/base/status.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/freshness_checker.h" #include "mongo/db/repl/heartbeat_response_action.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replica_set_config_checks.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/functional.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/time_support.h" namespace mongo { namespace repl { namespace { typedef ReplicationExecutor::CallbackHandle CBHandle; } // namespace using executor::RemoteCommandRequest; void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, const HostAndPort& target, int targetIndex) { LockGuard topoLock(_topoMutex); _untrackHeartbeatHandle(cbData.myHandle); if (cbData.status == ErrorCodes::CallbackCanceled) { return; } const Date_t now = _replExecutor.now(); BSONObj heartbeatObj; Milliseconds timeout(0); if (isV1ElectionProtocol()) { const std::pair hbRequest = _topCoord->prepareHeartbeatRequestV1(now, _settings.ourSetName(), target); heartbeatObj = hbRequest.first.toBSON(); timeout = hbRequest.second; } else { const std::pair hbRequest = _topCoord->prepareHeartbeatRequest(now, _settings.ourSetName(), target); heartbeatObj = hbRequest.first.toBSON(); timeout = hbRequest.second; } const RemoteCommandRequest request( target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), timeout); const ReplicationExecutor::RemoteCommandCallbackFn callback = stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse, this, stdx::placeholders::_1, targetIndex); _trackHeartbeatHandle(_replExecutor.scheduleRemoteCommand(request, callback)); } void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget(const HostAndPort& target, int targetIndex, Date_t when) { LOG(2) << "Scheduling heartbeat to " << target << " at " << dateToISOStringUTC(when); _trackHeartbeatHandle( _replExecutor.scheduleWorkAt(when, stdx::bind(&ReplicationCoordinatorImpl::_doMemberHeartbeat, this, stdx::placeholders::_1, target, targetIndex))); } void ReplicationCoordinatorImpl::_handleHeartbeatResponse( const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) { LockGuard topoLock(_topoMutex); // remove handle from queued heartbeats _untrackHeartbeatHandle(cbData.myHandle); // Parse and validate the response. At the end of this step, if responseStatus is OK then // hbResponse is valid. Status responseStatus = cbData.response.getStatus(); if (responseStatus == ErrorCodes::CallbackCanceled) { return; } const HostAndPort& target = cbData.request.target; ReplSetHeartbeatResponse hbResponse; BSONObj resp; if (responseStatus.isOK()) { resp = cbData.response.getValue().data; responseStatus = hbResponse.initialize(resp, _topCoord->getTerm()); StatusWith replMetadata = rpc::ReplSetMetadata::readFromMetadata(cbData.response.getValue().metadata); // Reject heartbeat responses (and metadata) from nodes with mismatched replica set IDs. // It is problematic to perform this check in the heartbeat reconfiguring logic because it // is possible for two mismatched replica sets to have the same replica set name and // configuration version. A heartbeat reconfiguration would not take place in that case. // Additionally, this is where we would stop further processing of the metadata from an // unknown replica set. if (replMetadata.isOK() && _rsConfig.isInitialized() && _rsConfig.hasReplicaSetId() && replMetadata.getValue().getReplicaSetId().isSet() && _rsConfig.getReplicaSetId() != replMetadata.getValue().getReplicaSetId()) { responseStatus = Status(ErrorCodes::InvalidReplicaSetConfig, str::stream() << "replica set IDs do not match, ours: " << _rsConfig.getReplicaSetId() << "; remote node's: " << replMetadata.getValue().getReplicaSetId()); // Ignore metadata. replMetadata = responseStatus; } if (replMetadata.isOK()) { // Asynchronous stepdown could happen, but it will be queued in executor after // this function, so we cannot and don't need to wait for it to finish. _processReplSetMetadata_incallback(replMetadata.getValue()); } } const Date_t now = _replExecutor.now(); const OpTime lastApplied = getMyLastAppliedOpTime(); // Locks and unlocks _mutex. Milliseconds networkTime(0); StatusWith hbStatusResponse(hbResponse); if (responseStatus.isOK()) { networkTime = cbData.response.getValue().elapsedMillis; // TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this // and update tests. _updateTerm_incallback(hbStatusResponse.getValue().getTerm()); // Postpone election timeout if we have a successful heartbeat response from the primary. const auto& hbResponse = hbStatusResponse.getValue(); if (hbResponse.hasState() && hbResponse.getState().primary()) { cancelAndRescheduleElectionTimeout(); } } else { log() << "Error in heartbeat request to " << target << "; " << responseStatus; if (!resp.isEmpty()) { LOG(3) << "heartbeat response: " << resp; } hbStatusResponse = StatusWith(responseStatus); } HeartbeatResponseAction action = _topCoord->processHeartbeatResponse( now, networkTime, target, hbStatusResponse, lastApplied); if (action.getAction() == HeartbeatResponseAction::NoAction && hbStatusResponse.isOK() && targetIndex >= 0 && hbStatusResponse.getValue().hasState() && hbStatusResponse.getValue().getState() != MemberState::RS_PRIMARY) { ReplSetHeartbeatResponse hbResp = hbStatusResponse.getValue(); if (hbResp.hasAppliedOpTime()) { stdx::unique_lock lk(_mutex); if (hbResp.getConfigVersion() == _rsConfig.getConfigVersion()) { _updateOpTimesFromHeartbeat_inlock( targetIndex, hbResp.hasDurableOpTime() ? hbResp.getDurableOpTime() : OpTime(), hbResp.getAppliedOpTime()); // TODO: Enable with Data Replicator // lk.unlock(); //_dr.slavesHaveProgressed(); } } } // In case our updated OpTime allows a waiter to finish stepping down, we wake all the waiters. _signalStepDownWaiters(); _scheduleHeartbeatToTarget( target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); _handleHeartbeatResponseAction(action, hbStatusResponse); } void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIndex, const OpTime& durableOpTime, const OpTime& appliedOpTime) { invariant(_selfIndex >= 0); invariant(targetIndex >= 0); SlaveInfo& slaveInfo = _slaveInfo[targetIndex]; if (appliedOpTime > slaveInfo.lastAppliedOpTime) { _updateSlaveInfoAppliedOpTime_inlock(&slaveInfo, appliedOpTime); } if (durableOpTime > slaveInfo.lastDurableOpTime) { _updateSlaveInfoDurableOpTime_inlock(&slaveInfo, durableOpTime); } } void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction( const HeartbeatResponseAction& action, const StatusWith& responseStatus) { switch (action.getAction()) { case HeartbeatResponseAction::NoAction: // Update the cached member state if different than the current topology member state if (_memberState != _topCoord->getMemberState()) { stdx::unique_lock lk(_mutex); const PostMemberStateUpdateAction postUpdateAction = _updateMemberStateFromTopologyCoordinator_inlock(); lk.unlock(); _performPostMemberStateUpdateAction(postUpdateAction); } break; case HeartbeatResponseAction::Reconfig: invariant(responseStatus.isOK()); _scheduleHeartbeatReconfig(responseStatus.getValue().getConfig()); break; case HeartbeatResponseAction::StartElection: _startElectSelf(); break; case HeartbeatResponseAction::StepDownSelf: invariant(action.getPrimaryConfigIndex() == _selfIndex); log() << "Stepping down from primary in response to heartbeat"; _topCoord->prepareForStepDown(); // Don't need to wait for stepdown to finish. _stepDownStart(); break; case HeartbeatResponseAction::StepDownRemotePrimary: { invariant(action.getPrimaryConfigIndex() != _selfIndex); _requestRemotePrimaryStepdown( _rsConfig.getMemberAt(action.getPrimaryConfigIndex()).getHostAndPort()); break; } case HeartbeatResponseAction::PriorityTakeover: { stdx::unique_lock lk(_mutex); if (!_priorityTakeoverCbh.isValid()) { _priorityTakeoverWhen = _replExecutor.now() + _rsConfig.getPriorityTakeoverDelay(_selfIndex); log() << "Scheduling priority takeover at " << _priorityTakeoverWhen; _priorityTakeoverCbh = _scheduleWorkAt( _priorityTakeoverWhen, stdx::bind( &ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, this, true)); } break; } } } namespace { /** * This callback is purely for logging and has no effect on any other operations */ void remoteStepdownCallback(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) { const Status status = cbData.response.getStatus(); if (status == ErrorCodes::CallbackCanceled) { return; } if (status.isOK()) { LOG(1) << "stepdown of primary(" << cbData.request.target << ") succeeded with response -- " << cbData.response.getValue().data; } else { warning() << "stepdown of primary(" << cbData.request.target << ") failed due to " << cbData.response.getStatus(); } } } // namespace void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort& target) { RemoteCommandRequest request(target, "admin", BSON("replSetStepDown" << 20)); log() << "Requesting " << target << " step down from primary"; CBHStatus cbh = _replExecutor.scheduleRemoteCommand(request, remoteStepdownCallback); if (cbh.getStatus() != ErrorCodes::ShutdownInProgress) { fassert(18808, cbh.getStatus()); } } ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() { auto finishEvent = _makeEvent(); if (!finishEvent) { return finishEvent; } _replExecutor.scheduleWorkWithGlobalExclusiveLock(stdx::bind( &ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1, finishEvent)); return finishEvent; } void ReplicationCoordinatorImpl::_stepDownFinish( const ReplicationExecutor::CallbackArgs& cbData, const ReplicationExecutor::EventHandle& finishedEvent) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } LockGuard topoLock(_topoMutex); invariant(cbData.txn); // TODO Add invariant that we've got global shared or global exclusive lock, when supported // by lock manager. stdx::unique_lock lk(_mutex); _topCoord->stepDownIfPending(); const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock(); lk.unlock(); _performPostMemberStateUpdateAction(action); _replExecutor.signalEvent(finishedEvent); } void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(const ReplicaSetConfig& newConfig) { stdx::lock_guard lk(_mutex); if (_inShutdown) { return; } switch (_rsConfigState) { case kConfigUninitialized: case kConfigSteady: LOG(1) << "Received new config via heartbeat with version " << newConfig.getConfigVersion(); break; case kConfigInitiating: case kConfigReconfiguring: case kConfigHBReconfiguring: LOG(1) << "Ignoring new configuration with version " << newConfig.getConfigVersion() << " because already in the midst of a configuration process"; return; case kConfigPreStart: case kConfigStartingUp: case kConfigReplicationDisabled: severe() << "Reconfiguration request occurred while _rsConfigState == " << int(_rsConfigState) << "; aborting."; fassertFailed(18807); } _setConfigState_inlock(kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersion() < newConfig.getConfigVersion()); if (_freshnessChecker) { _freshnessChecker->cancel(); if (_electCmdRunner) { _electCmdRunner->cancel(); } _replExecutor.onEvent( _electionFinishedEvent, stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled, this, stdx::placeholders::_1, newConfig)); return; } _replExecutor.scheduleDBWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore, this, stdx::placeholders::_1, newConfig)); } void ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled( const ReplicationExecutor::CallbackArgs& cbData, const ReplicaSetConfig& newConfig) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } LockGuard topoLock(_topoMutex); fassert(18911, cbData.status); stdx::lock_guard lk(_mutex); if (_inShutdown) { return; } _replExecutor.scheduleDBWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore, this, stdx::placeholders::_1, newConfig)); } void ReplicationCoordinatorImpl::_heartbeatReconfigStore( const ReplicationExecutor::CallbackArgs& cbd, const ReplicaSetConfig& newConfig) { if (cbd.status.code() == ErrorCodes::CallbackCanceled) { log() << "The callback to persist the replica set configuration was canceled - " << "the configuration was not persisted but was used: " << newConfig.toBSON(); return; } stdx::unique_lock lk(_mutex, stdx::defer_lock); const StatusWith myIndex = validateConfigForHeartbeatReconfig(_externalState.get(), newConfig); if (myIndex.getStatus() == ErrorCodes::NodeNotFound) { lk.lock(); // If this node absent in newConfig, and this node was not previously initialized, // return to kConfigUninitialized immediately, rather than storing the config and // transitioning into the RS_REMOVED state. See SERVER-15740. if (!_rsConfig.isInitialized()) { invariant(_rsConfigState == kConfigHBReconfiguring); LOG(1) << "Ignoring new configuration in heartbeat response because we are " "uninitialized and not a member of the new configuration"; _setConfigState_inlock(kConfigUninitialized); return; } lk.unlock(); } if (!myIndex.getStatus().isOK() && myIndex.getStatus() != ErrorCodes::NodeNotFound) { warning() << "Not persisting new configuration in heartbeat response to disk because " "it is invalid: " << myIndex.getStatus(); } else { Status status = _externalState->storeLocalConfigDocument(cbd.txn, newConfig.toBSON()); lk.lock(); if (!status.isOK()) { error() << "Ignoring new configuration in heartbeat response because we failed to" " write it to stable storage; " << status; invariant(_rsConfigState == kConfigHBReconfiguring); if (_rsConfig.isInitialized()) { _setConfigState_inlock(kConfigSteady); } else { _setConfigState_inlock(kConfigUninitialized); } return; } auto isFirstConfig = !_rsConfig.isInitialized(); lk.unlock(); bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 && newConfig.getMemberAt(myIndex.getValue()).isArbiter(); if (!isArbiter && isFirstConfig) { _externalState->startThreads(_settings); _startDataReplication(cbd.txn); } } const CallbackFn reconfigFinishFn( stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish, this, stdx::placeholders::_1, newConfig, myIndex)); // Make sure that the reconfigFinishFn doesn't finish until we've reset // _heartbeatReconfigThread. lk.lock(); if (_memberState.primary()) { // If the primary is receiving a heartbeat reconfig, that strongly suggests // that there has been a force reconfiguration. In any event, it might lead // to this node stepping down as primary, so we'd better do it with the global // lock. _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn); } else { _replExecutor.scheduleWork(reconfigFinishFn); } } void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( const ReplicationExecutor::CallbackArgs& cbData, const ReplicaSetConfig& newConfig, StatusWith myIndex) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } LockGuard topoLock(_topoMutex); stdx::unique_lock lk(_mutex); invariant(_rsConfigState == kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersion() < newConfig.getConfigVersion()); if (_getMemberState_inlock().primary() && !cbData.txn) { // Not having an OperationContext in the CallbackData means we definitely aren't holding // the global lock. Since we're primary and this reconfig could cause us to stepdown, // reschedule this work with the global exclusive lock so the stepdown is safe. // TODO(spencer): When we *do* have an OperationContext, consult it to confirm that // we are indeed holding the global lock. _replExecutor.scheduleWorkWithGlobalExclusiveLock( stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish, this, stdx::placeholders::_1, newConfig, myIndex)); return; } // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig. if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { if (isV1ElectionProtocol()) { invariant(_voteRequester); _voteRequester->cancel(); } else { invariant(_freshnessChecker); _freshnessChecker->cancel(); if (_electCmdRunner) { _electCmdRunner->cancel(); } } // Wait for the election to complete and the node's Role to be set to follower. _replExecutor.onEvent(_electionFinishedEvent, stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish, this, stdx::placeholders::_1, newConfig, myIndex)); return; } if (!myIndex.isOK()) { switch (myIndex.getStatus().code()) { case ErrorCodes::NodeNotFound: log() << "Cannot find self in new replica set configuration; I must be removed; " << myIndex.getStatus(); break; case ErrorCodes::DuplicateKey: error() << "Several entries in new config represent this node; " "Removing self until an acceptable configuration arrives; " << myIndex.getStatus(); break; default: error() << "Could not validate configuration received from remote node; " "Removing self until an acceptable configuration arrives; " << myIndex.getStatus(); break; } myIndex = StatusWith(-1); } const ReplicaSetConfig oldConfig = _rsConfig; // If we do not have an index, we should pass -1 as our index to avoid falsely adding ourself to // the data structures inside of the TopologyCoordinator. const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1; const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndexValue); lk.unlock(); _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig); _performPostMemberStateUpdateAction(action); } void ReplicationCoordinatorImpl::_trackHeartbeatHandle(const StatusWith& handle) { if (handle.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(18912, handle.getStatus()); _heartbeatHandles.push_back(handle.getValue()); } void ReplicationCoordinatorImpl::_untrackHeartbeatHandle(const CBHandle& handle) { const HeartbeatHandles::iterator newEnd = std::remove(_heartbeatHandles.begin(), _heartbeatHandles.end(), handle); invariant(newEnd != _heartbeatHandles.end()); _heartbeatHandles.erase(newEnd, _heartbeatHandles.end()); } void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { std::for_each(_heartbeatHandles.begin(), _heartbeatHandles.end(), stdx::bind(&ReplicationExecutor::cancel, &_replExecutor, stdx::placeholders::_1)); // Heartbeat callbacks will remove themselves from _heartbeatHandles when they execute with // CallbackCanceled status, so it's better to leave the handles in the list, for now. if (_handleLivenessTimeoutCbh.isValid()) { _replExecutor.cancel(_handleLivenessTimeoutCbh); } } void ReplicationCoordinatorImpl::_restartHeartbeats_inlock() { _cancelHeartbeats_inlock(); _startHeartbeats_inlock(); } void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { const Date_t now = _replExecutor.now(); _seedList.clear(); for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { if (i == _selfIndex) { continue; } _scheduleHeartbeatToTarget(_rsConfig.getMemberAt(i).getHostAndPort(), i, now); } if (isV1ElectionProtocol()) { for (auto&& slaveInfo : _slaveInfo) { slaveInfo.lastUpdate = _replExecutor.now(); slaveInfo.down = false; } _scheduleNextLivenessUpdate_inlock(); } } void ReplicationCoordinatorImpl::_handleLivenessTimeout( const ReplicationExecutor::CallbackArgs& cbData) { LockGuard topoLock(_topoMutex); stdx::lock_guard lk(_mutex); // Only reset the callback handle if it matches, otherwise more will be coming through if (cbData.myHandle == _handleLivenessTimeoutCbh) { _handleLivenessTimeoutCbh = CallbackHandle(); } if (!cbData.status.isOK()) { return; } if (!isV1ElectionProtocol()) { return; } // Scan liveness table for problems and mark nodes as down by calling into topocoord. auto now(_replExecutor.now()); for (auto&& slaveInfo : _slaveInfo) { if (slaveInfo.self) { continue; } if (slaveInfo.down) { continue; } if (now - slaveInfo.lastUpdate >= _rsConfig.getElectionTimeoutPeriod()) { int memberIndex = _rsConfig.findMemberIndexByConfigId(slaveInfo.memberId); if (memberIndex == -1) { continue; } slaveInfo.down = true; if (_memberState.primary()) { // Only adjust hbdata if we are primary, since only the primary has a full view // of the entire cluster. // Secondaries might not see other secondaries in the cluster if they are not // downstream. HeartbeatResponseAction action = _topCoord->setMemberAsDown(now, memberIndex, _getMyLastDurableOpTime_inlock()); // Don't mind potential asynchronous stepdown as this is the last step of // liveness check. _handleHeartbeatResponseAction(action, makeStatusWith()); } } } _scheduleNextLivenessUpdate_inlock(); } void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock() { if (!isV1ElectionProtocol()) { return; } // Scan liveness table for earliest date; schedule a run at (that date plus election // timeout). Date_t earliestDate = Date_t::max(); int earliestMemberId = -1; for (auto&& slaveInfo : _slaveInfo) { if (slaveInfo.self) { continue; } if (slaveInfo.down) { // Already down. continue; } LOG(3) << "slaveinfo lastupdate is: " << slaveInfo.lastUpdate; if (earliestDate > slaveInfo.lastUpdate) { earliestDate = slaveInfo.lastUpdate; earliestMemberId = slaveInfo.memberId; } } LOG(3) << "earliest member " << earliestMemberId << " date: " << earliestDate; if (earliestMemberId == -1 || earliestDate == Date_t::max()) { _earliestMemberId = -1; // Nobody here but us. return; } if (_handleLivenessTimeoutCbh.isValid() && !_handleLivenessTimeoutCbh.isCanceled()) { // don't bother to schedule; one is already scheduled and pending. return; } auto nextTimeout = earliestDate + _rsConfig.getElectionTimeoutPeriod(); if (nextTimeout > _replExecutor.now()) { LOG(3) << "scheduling next check at " << nextTimeout; auto cbh = _scheduleWorkAt(nextTimeout, stdx::bind(&ReplicationCoordinatorImpl::_handleLivenessTimeout, this, stdx::placeholders::_1)); if (!cbh) { return; } _handleLivenessTimeoutCbh = cbh; _earliestMemberId = earliestMemberId; } } void ReplicationCoordinatorImpl::_cancelAndRescheduleLivenessUpdate_inlock(int updatedMemberId) { if ((_earliestMemberId != -1) && (_earliestMemberId != updatedMemberId)) { return; } if (_handleLivenessTimeoutCbh.isValid()) { _replExecutor.cancel(_handleLivenessTimeoutCbh); } _scheduleNextLivenessUpdate_inlock(); } void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() { if (_priorityTakeoverCbh.isValid()) { log() << "Canceling priority takeover callback"; _replExecutor.cancel(_priorityTakeoverCbh); _priorityTakeoverCbh = CallbackHandle(); _priorityTakeoverWhen = Date_t(); } } void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() { if (_handleElectionTimeoutCbh.isValid()) { LOG(4) << "Canceling election timeout callback at " << _handleElectionTimeoutWhen; _replExecutor.cancel(_handleElectionTimeoutCbh); _handleElectionTimeoutCbh = CallbackHandle(); _handleElectionTimeoutWhen = Date_t(); } if (!isV1ElectionProtocol()) { return; } if (!_memberState.secondary()) { return; } if (_selfIndex < 0) { return; } if (!_rsConfig.getMemberAt(_selfIndex).isElectable()) { return; } Milliseconds randomOffset = Milliseconds(_replExecutor.nextRandomInt64( durationCount(_rsConfig.getElectionTimeoutPeriod()) * _externalState->getElectionTimeoutOffsetLimitFraction())); auto now = _replExecutor.now(); auto when = now + _rsConfig.getElectionTimeoutPeriod() + randomOffset; invariant(when > now); LOG(4) << "Scheduling election timeout callback at " << when; _handleElectionTimeoutWhen = when; _handleElectionTimeoutCbh = _scheduleWorkAt( when, stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, this, false)); } void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(bool isPriorityTakeOver) { LockGuard topoLock(_topoMutex); if (!isV1ElectionProtocol()) { return; } // We should always reschedule this callback even if we do not make it to the election // process. { stdx::lock_guard lock(_mutex); _cancelPriorityTakeover_inlock(); _cancelAndRescheduleElectionTimeout_inlock(); } const auto status = _topCoord->becomeCandidateIfElectable(_replExecutor.now(), getMyLastAppliedOpTime()); if (!status.isOK()) { if (isPriorityTakeOver) { log() << "Not starting an election for a priority takeover, " << "since we are not electable due to: " << status.reason(); } else { log() << "Not starting an election, since we are not electable due to: " << status.reason(); } return; } if (isPriorityTakeOver) { log() << "Starting an election for a priority takeover"; } else { stdx::lock_guard lock(_mutex); log() << "Starting an election, since we've seen no PRIMARY in the past " << _rsConfig.getElectionTimeoutPeriod(); } _startElectSelfV1(); } } // namespace repl } // namespace mongo