/** * Copyright 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include #include "mongo/db/audit.h" #include "mongo/db/client_basic.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/heartbeat_response_action.h" #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/repl_set_html_summary.h" #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/rslog.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/hex.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace repl { using std::vector; const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30); namespace { template int indexOfIterator(const std::vector& vec, typename std::vector::const_iterator& it) { return static_cast(it - vec.begin()); } // Maximum number of retries for a failed heartbeat. const int kMaxHeartbeatRetries = 2; /** * Returns true if the only up heartbeats are auth errors. */ bool _hasOnlyAuthErrorUpHeartbeats(const std::vector& hbdata, const int selfIndex) { bool foundAuthError = false; for (std::vector::const_iterator it = hbdata.begin(); it != hbdata.end(); ++it) { if (indexOfIterator(hbdata, it) == selfIndex) { continue; } if (it->up()) { return false; } if (it->hasAuthIssue()) { foundAuthError = true; } } return foundAuthError; } void appendOpTime(BSONObjBuilder* bob, const char* elemName, const OpTime& opTime, const long long pv) { if (pv == 1) { opTime.append(bob, elemName); } else { bob->append(elemName, opTime.getTimestamp()); } } } // namespace void PingStats::start(Date_t now) { _lastHeartbeatStartDate = now; _numFailuresSinceLastStart = 0; } void PingStats::hit(Milliseconds millis) { _numFailuresSinceLastStart = std::numeric_limits::max(); ++count; value = value == UninitializedPing ? millis : Milliseconds((value * 4 + millis) / 5); } void PingStats::miss() { ++_numFailuresSinceLastStart; } TopologyCoordinatorImpl::TopologyCoordinatorImpl(Options options) : _role(Role::follower), _term(OpTime::kUninitializedTerm), _currentPrimaryIndex(-1), _forceSyncSourceIndex(-1), _options(std::move(options)), _selfIndex(-1), _stepDownPending(false), _maintenanceModeCalls(0), _followerMode(MemberState::RS_STARTUP2) { invariant(getMemberState() == MemberState::RS_STARTUP); } TopologyCoordinator::Role TopologyCoordinatorImpl::getRole() const { return _role; } void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) { invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); _forceSyncSourceIndex = index; } HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const { return _syncSource; } HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, const Timestamp& lastTimestampApplied) { // If we are primary, then we aren't syncing from anyone (else). if (_iAmPrimary()) { return HostAndPort(); } // If we are not a member of the current replica set configuration, no sync source is valid. if (_selfIndex == -1) { LOG(2) << "Cannot sync from any members because we are not in the replica set config"; return HostAndPort(); } // if we have a target we've requested to sync from, use it if (_forceSyncSourceIndex != -1) { invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort(); _forceSyncSourceIndex = -1; std::string msg(str::stream() << "syncing from: " << _syncSource.toString() << " by request"); log() << msg << rsLog; setMyHeartbeatMessage(now, msg); return _syncSource; } // wait for 2N pings (not counting ourselves) before choosing a sync target int needMorePings = (_hbdata.size() - 1) * 2 - _getTotalPings(); if (needMorePings > 0) { OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing"; _syncSource = HostAndPort(); return _syncSource; } // If we are only allowed to sync from the primary, set that if (!_rsConfig.isChainingAllowed()) { if (_currentPrimaryIndex == -1) { LOG(1) << "Cannot select sync source because chaining is" " not allowed and primary is unknown/down"; _syncSource = HostAndPort(); return _syncSource; } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) { LOG(1) << "Cannot select sync source because chaining is" "not allowed and primary is not currently accepting our updates"; _syncSource = HostAndPort(); return _syncSource; } else { _syncSource = _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort(); std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString()); log() << msg << rsLog; setMyHeartbeatMessage(now, msg); return _syncSource; } } // find the member with the lowest ping time that is ahead of me // choose a time that will exclude no candidates by default, in case we don't see a primary OpTime oldestSyncOpTime; // Find primary's oplog time. Reject sync candidates that are more than // _options.maxSyncSourceLagSecs seconds behind. if (_currentPrimaryIndex != -1) { OpTime primaryOpTime = _hbdata.at(_currentPrimaryIndex).getAppliedOpTime(); // Check if primaryOpTime is still close to 0 because we haven't received // our first heartbeat from a new primary yet. unsigned int maxLag = static_cast(durationCount(_options.maxSyncSourceLagSecs)); if (primaryOpTime.getSecs() >= maxLag) { oldestSyncOpTime = OpTime(Timestamp(primaryOpTime.getSecs() - maxLag, 0), primaryOpTime.getTerm()); } } int closestIndex = -1; // Make two attempts, with less restrictive rules the second time. // // During the first attempt, we ignore those nodes that have a larger slave // delay, hidden nodes or non-voting, and nodes that are excessively behind. // // For the second attempt include those nodes, in case those are the only ones we can reach. // // This loop attempts to set 'closestIndex', to select a viable candidate. for (int attempts = 0; attempts < 2; ++attempts) { for (std::vector::const_iterator it = _hbdata.begin(); it != _hbdata.end(); ++it) { const int itIndex = indexOfIterator(_hbdata, it); // Don't consider ourselves. if (itIndex == _selfIndex) { continue; } // Candidate must be up to be considered. if (!it->up()) { continue; } // Candidate must be PRIMARY or SECONDARY state to be considered. if (!it->getState().readable()) { continue; } const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex)); // Things to skip on the first attempt. if (attempts == 0) { // Candidate must be a voter if we are a voter. if (_selfConfig().isVoter() && !itMemberConfig.isVoter()) { continue; } // Candidates must not be hidden. if (itMemberConfig.isHidden()) { continue; } // Candidates cannot be excessively behind. if (it->getAppliedOpTime() < oldestSyncOpTime) { continue; } // Candidate must not have a configured delay larger than ours. if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay()) { continue; } } // Candidate must build indexes if we build indexes, to be considered. if (_selfConfig().shouldBuildIndexes()) { if (!itMemberConfig.shouldBuildIndexes()) { continue; } } // only consider candidates that are ahead of where we are if (it->getAppliedOpTime().getTimestamp() <= lastTimestampApplied) { continue; } // Candidate cannot be more latent than anything we've already considered. if ((closestIndex != -1) && (_getPing(itMemberConfig.getHostAndPort()) > _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) { continue; } // Candidate cannot be blacklisted. if (_memberIsBlacklisted(itMemberConfig, now)) { continue; } // This candidate has passed all tests; set 'closestIndex' closestIndex = itIndex; } if (closestIndex != -1) break; // no need for second attempt } if (closestIndex == -1) { // Did not find any members to sync from std::string msg("could not find member to sync from"); // Only log when we had a valid sync source before if (!_syncSource.empty()) { log() << msg << rsLog; } setMyHeartbeatMessage(now, msg); _syncSource = HostAndPort(); return _syncSource; } _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort(); std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0); log() << msg << rsLog; setMyHeartbeatMessage(now, msg); return _syncSource; } bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const { std::map::const_iterator blacklisted = _syncSourceBlacklist.find(memberConfig.getHostAndPort()); if (blacklisted != _syncSourceBlacklist.end()) { if (blacklisted->second > now) { return true; } } return false; } void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { LOG(2) << "blacklisting " << host << " until " << until.toString(); _syncSourceBlacklist[host] = until; } void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Date_t now) { std::map::iterator hostItr = _syncSourceBlacklist.find(host); if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) { LOG(2) << "unblacklisting " << host; _syncSourceBlacklist.erase(hostItr); } } void TopologyCoordinatorImpl::clearSyncSourceBlacklist() { _syncSourceBlacklist.clear(); } void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target, const OpTime& lastOpApplied, BSONObjBuilder* response, Status* result) { response->append("syncFromRequested", target.toString()); if (_selfIndex == -1) { *result = Status(ErrorCodes::NotSecondary, "Removed and uninitialized nodes do not sync"); return; } const MemberConfig& selfConfig = _selfConfig(); if (selfConfig.isArbiter()) { *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync"); return; } if (_selfIndex == _currentPrimaryIndex) { *result = Status(ErrorCodes::NotSecondary, "primaries don't sync"); return; } ReplicaSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd(); int targetIndex = 0; for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); ++it) { if (it->getHostAndPort() == target) { targetConfig = it; break; } ++targetIndex; } if (targetConfig == _rsConfig.membersEnd()) { *result = Status(ErrorCodes::NodeNotFound, str::stream() << "Could not find member \"" << target.toString() << "\" in replica set"); return; } if (targetIndex == _selfIndex) { *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself"); return; } if (targetConfig->isArbiter()) { *result = Status(ErrorCodes::InvalidOptions, str::stream() << "Cannot sync from \"" << target.toString() << "\" because it is an arbiter"); return; } if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) { *result = Status(ErrorCodes::InvalidOptions, str::stream() << "Cannot sync from \"" << target.toString() << "\" because it does not build indexes"); return; } if (selfConfig.isVoter() && !targetConfig->isVoter()) { *result = Status(ErrorCodes::InvalidOptions, str::stream() << "Cannot sync from \"" << target.toString() << "\" because it is not a voter"); return; } const MemberHeartbeatData& hbdata = _hbdata.at(targetIndex); if (hbdata.hasAuthIssue()) { *result = Status(ErrorCodes::Unauthorized, str::stream() << "not authorized to communicate with " << target.toString()); return; } if (hbdata.getHealth() == 0) { *result = Status(ErrorCodes::HostUnreachable, str::stream() << "I cannot reach the requested member: " << target.toString()); return; } if (hbdata.getAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) { warning() << "attempting to sync from " << target << ", but its latest opTime is " << hbdata.getAppliedOpTime().getSecs() << " and ours is " << lastOpApplied.getSecs() << " so this may not work"; response->append("warning", str::stream() << "requested member \"" << target.toString() << "\" is more than 10 seconds behind us"); // not returning bad Status, just warning } HostAndPort prevSyncSource = getSyncSourceAddress(); if (!prevSyncSource.empty()) { response->append("prevSyncTarget", prevSyncSource.toString()); } setForceSyncSourceIndex(targetIndex); *result = Status::OK(); } void TopologyCoordinatorImpl::prepareFreshResponse( const ReplicationCoordinator::ReplSetFreshArgs& args, const Date_t now, const OpTime& lastOpApplied, BSONObjBuilder* response, Status* result) { if (_rsConfig.getProtocolVersion() != 0) { *result = Status(ErrorCodes::BadValue, str::stream() << "replset: incompatible replset protocol version: " << _rsConfig.getProtocolVersion()); return; } if (_selfIndex == -1) { *result = Status(ErrorCodes::ReplicaSetNotFound, "Cannot participate in elections because not initialized"); return; } if (args.setName != _rsConfig.getReplSetName()) { *result = Status(ErrorCodes::ReplicaSetNotFound, str::stream() << "Wrong repl set name. Expected: " << _rsConfig.getReplSetName() << ", received: " << args.setName); return; } if (args.id == static_cast(_selfConfig().getId())) { *result = Status(ErrorCodes::BadValue, str::stream() << "Received replSetFresh command from member with the " "same member ID as ourself: " << args.id); return; } bool weAreFresher = false; if (_rsConfig.getConfigVersion() > args.cfgver) { log() << "replSet member " << args.who << " is not yet aware its cfg version " << args.cfgver << " is stale"; response->append("info", "config version stale"); weAreFresher = true; } // check not only our own optime, but any other member we can reach else if (OpTime(args.opTime, _term) < _latestKnownOpTime(lastOpApplied)) { weAreFresher = true; } response->appendDate("opTime", Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL())); response->append("fresher", weAreFresher); std::string errmsg; bool doVeto = _shouldVetoMember(args, now, lastOpApplied, &errmsg); response->append("veto", doVeto); if (doVeto) { response->append("errmsg", errmsg); } *result = Status::OK(); } bool TopologyCoordinatorImpl::_shouldVetoMember( const ReplicationCoordinator::ReplSetFreshArgs& args, const Date_t& now, const OpTime& lastOpApplied, std::string* errmsg) const { if (_rsConfig.getConfigVersion() < args.cfgver) { // We are stale; do not veto. return false; } const unsigned int memberID = args.id; const int hopefulIndex = _getMemberIndex(memberID); invariant(hopefulIndex != _selfIndex); const int highestPriorityIndex = _getHighestPriorityElectableIndex(now, lastOpApplied); if (hopefulIndex == -1) { *errmsg = str::stream() << "replSet couldn't find member with id " << memberID; return true; } if (_iAmPrimary() && lastOpApplied >= _hbdata.at(hopefulIndex).getAppliedOpTime()) { // hbinfo is not updated for ourself, so if we are primary we have to check the // primary's last optime separately *errmsg = str::stream() << "I am already primary, " << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " can try again once I've stepped down"; return true; } if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) && (_hbdata.at(_currentPrimaryIndex).getAppliedOpTime() >= _hbdata.at(hopefulIndex).getAppliedOpTime())) { // other members might be aware of more up-to-date nodes *errmsg = str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " is trying to elect itself but " << _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString() << " is already primary and more up-to-date"; return true; } if ((highestPriorityIndex != -1)) { const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex); const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex); if (priorityMember.getPriority() > hopefulMember.getPriority()) { *errmsg = str::stream() << hopefulMember.getHostAndPort().toString() << " has lower priority of " << hopefulMember.getPriority() << " than " << priorityMember.getHostAndPort().toString() << " which has a priority of " << priorityMember.getPriority(); return true; } } UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex, lastOpApplied); reason &= ~RefusesToStand; if (reason) { *errmsg = str::stream() << "I don't think " << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " is electable because the " << _getUnelectableReasonString(reason); return true; } return false; } // produce a reply to a received electCmd void TopologyCoordinatorImpl::prepareElectResponse( const ReplicationCoordinator::ReplSetElectArgs& args, const Date_t now, const OpTime& lastOpApplied, BSONObjBuilder* response, Status* result) { if (_rsConfig.getProtocolVersion() != 0) { *result = Status(ErrorCodes::BadValue, str::stream() << "replset: incompatible replset protocol version: " << _rsConfig.getProtocolVersion()); return; } if (_selfIndex == -1) { *result = Status(ErrorCodes::ReplicaSetNotFound, "Cannot participate in election because not initialized"); return; } const long long myver = _rsConfig.getConfigVersion(); const int highestPriorityIndex = _getHighestPriorityElectableIndex(now, lastOpApplied); const MemberConfig* primary = _currentPrimaryMember(); const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid); const MemberConfig* highestPriority = highestPriorityIndex == -1 ? NULL : &_rsConfig.getMemberAt(highestPriorityIndex); int vote = 0; if (args.set != _rsConfig.getReplSetName()) { log() << "replSet error received an elect request for '" << args.set << "' but our set name is '" << _rsConfig.getReplSetName() << "'"; } else if (myver < args.cfgver) { // we are stale. don't vote log() << "replSetElect not voting because our config version is stale. Our version: " << myver << ", their version: " << args.cfgver; } else if (myver > args.cfgver) { // they are stale! log() << "replSetElect command received stale config version # during election. " "Our version: " << myver << ", their version: " << args.cfgver; vote = -10000; } else if (!hopeful) { log() << "replSetElect couldn't find member with id " << args.whoid; vote = -10000; } else if (_iAmPrimary()) { log() << "I am already primary, " << hopeful->getHostAndPort().toString() << " can try again once I've stepped down"; vote = -10000; } else if (primary) { log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but " << primary->getHostAndPort().toString() << " is already primary"; vote = -10000; } else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) { // TODO(spencer): What if the lower-priority member is more up-to-date? log() << hopeful->getHostAndPort().toString() << " has lower priority than " << highestPriority->getHostAndPort().toString(); vote = -10000; } else if (_voteLease.when + VoteLease::leaseTime >= now && _voteLease.whoId != args.whoid) { log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for " << _voteLease.whoHostAndPort.toString() << ' ' << durationCount(now - _voteLease.when) << " secs ago"; } else { _voteLease.when = now; _voteLease.whoId = args.whoid; _voteLease.whoHostAndPort = hopeful->getHostAndPort(); vote = _selfConfig().getNumVotes(); invariant(hopeful->getId() == args.whoid); if (vote > 0) { log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " (" << args.whoid << ')'; } } response->append("vote", vote); response->append("round", args.round); *result = Status::OK(); } // produce a reply to a heartbeat Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now, const ReplSetHeartbeatArgs& args, const std::string& ourSetName, const OpTime& lastOpApplied, const OpTime& lastOpDurable, ReplSetHeartbeatResponse* response) { if (args.getProtocolVersion() != 1) { return Status(ErrorCodes::BadValue, str::stream() << "replset: incompatible replset protocol version: " << args.getProtocolVersion()); } // Verify that replica set names match const std::string rshb = args.getSetName(); if (ourSetName != rshb) { log() << "replSet set names do not match, ours: " << ourSetName << "; remote node's: " << rshb; response->noteMismatched(); return Status(ErrorCodes::InconsistentReplicaSetNames, str::stream() << "Our set name of " << ourSetName << " does not match name " << rshb << " reported by remote node"); } const MemberState myState = getMemberState(); if (_selfIndex == -1) { if (myState.removed()) { return Status(ErrorCodes::InvalidReplicaSetConfig, "Our replica set configuration is invalid or does not include us"); } } else { invariant(_rsConfig.getReplSetName() == args.getSetName()); if (args.getSenderId() == _selfConfig().getId()) { return Status(ErrorCodes::BadValue, str::stream() << "Received heartbeat from member with the same " "member ID as ourself: " << args.getSenderId()); } } // This is a replica set response->noteReplSet(); response->setSetName(ourSetName); response->setState(myState.s); if (myState.primary()) { response->setElectionTime(_electionTime); } // Are we electable response->setElectable(!_getMyUnelectableReason(now, lastOpApplied)); // Heartbeat status message response->setHbMsg(_getHbmsg(now)); response->setTime(duration_cast(now - Date_t{})); response->setAppliedOpTime(lastOpApplied); response->setDurableOpTime(lastOpDurable); if (!_syncSource.empty()) { response->setSyncingTo(_syncSource); } if (!_rsConfig.isInitialized()) { response->setConfigVersion(-2); return Status::OK(); } const long long v = _rsConfig.getConfigVersion(); response->setConfigVersion(v); // Deliver new config if caller's version is older than ours if (v > args.getConfigVersion()) { response->setConfig(_rsConfig); } // Resolve the caller's id in our Member list int from = -1; if (v == args.getConfigVersion() && args.getSenderId() != -1) { from = _getMemberIndex(args.getSenderId()); } if (from == -1) { // Can't find the member, so we leave out the stateDisagreement field return Status::OK(); } invariant(from != _selfIndex); // if we thought that this node is down, let it know if (!_hbdata.at(from).up()) { response->noteStateDisagreement(); } // note that we got a heartbeat from this node _hbdata.at(from).setLastHeartbeatRecv(now); return Status::OK(); } Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now, const ReplSetHeartbeatArgsV1& args, const std::string& ourSetName, const OpTime& lastOpApplied, const OpTime& lastOpDurable, ReplSetHeartbeatResponse* response) { // Verify that replica set names match const std::string rshb = args.getSetName(); if (ourSetName != rshb) { log() << "replSet set names do not match, ours: " << ourSetName << "; remote node's: " << rshb; return Status(ErrorCodes::InconsistentReplicaSetNames, str::stream() << "Our set name of " << ourSetName << " does not match name " << rshb << " reported by remote node"); } const MemberState myState = getMemberState(); if (_selfIndex == -1) { if (myState.removed()) { return Status(ErrorCodes::InvalidReplicaSetConfig, "Our replica set configuration is invalid or does not include us"); } } else { if (args.getSenderId() == _selfConfig().getId()) { return Status(ErrorCodes::BadValue, str::stream() << "Received heartbeat from member with the same " "member ID as ourself: " << args.getSenderId()); } } response->setSetName(ourSetName); response->setState(myState.s); if (myState.primary()) { response->setElectionTime(_electionTime); } response->setAppliedOpTime(lastOpApplied); response->setDurableOpTime(lastOpDurable); if (_currentPrimaryIndex != -1) { response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId()); } response->setTerm(_term); if (!_syncSource.empty()) { response->setSyncingTo(_syncSource); } if (!_rsConfig.isInitialized()) { response->setConfigVersion(-2); return Status::OK(); } const long long v = _rsConfig.getConfigVersion(); response->setConfigVersion(v); // Deliver new config if caller's version is older than ours if (v > args.getConfigVersion()) { response->setConfig(_rsConfig); } // Resolve the caller's id in our Member list int from = -1; if (v == args.getConfigVersion() && args.getSenderId() != -1) { from = _getMemberIndex(args.getSenderId()); } if (from == -1) { return Status::OK(); } invariant(from != _selfIndex); // note that we got a heartbeat from this node _hbdata.at(from).setLastHeartbeatRecv(now); return Status::OK(); } int TopologyCoordinatorImpl::_getMemberIndex(int id) const { int index = 0; for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); ++it, ++index) { if (it->getId() == id) { return index; } } return -1; } std::pair TopologyCoordinatorImpl::prepareHeartbeatRequest( Date_t now, const std::string& ourSetName, const HostAndPort& target) { PingStats& hbStats = _pings[target]; Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); if (!_rsConfig.isInitialized() || (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) || (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) { // This is either the first request ever for "target", or the heartbeat timeout has // passed, so we're starting a "new" heartbeat. hbStats.start(now); alreadyElapsed = Milliseconds(0); } ReplSetHeartbeatArgs hbArgs; hbArgs.setProtocolVersion(1); hbArgs.setCheckEmpty(false); if (_rsConfig.isInitialized()) { hbArgs.setSetName(_rsConfig.getReplSetName()); hbArgs.setConfigVersion(_rsConfig.getConfigVersion()); if (_selfIndex >= 0) { const MemberConfig& me = _selfConfig(); hbArgs.setSenderHost(me.getHostAndPort()); hbArgs.setSenderId(me.getId()); } } else { hbArgs.setSetName(ourSetName); hbArgs.setConfigVersion(-2); } const Milliseconds timeoutPeriod(_rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis() : ReplicaSetConfig::kDefaultHeartbeatTimeoutPeriod); const Milliseconds timeout = timeoutPeriod - alreadyElapsed; return std::make_pair(hbArgs, timeout); } std::pair TopologyCoordinatorImpl::prepareHeartbeatRequestV1( Date_t now, const std::string& ourSetName, const HostAndPort& target) { PingStats& hbStats = _pings[target]; Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64()); if (!_rsConfig.isInitialized() || (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) || (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) { // This is either the first request ever for "target", or the heartbeat timeout has // passed, so we're starting a "new" heartbeat. hbStats.start(now); alreadyElapsed = Milliseconds(0); } ReplSetHeartbeatArgsV1 hbArgs; if (_rsConfig.isInitialized()) { hbArgs.setSetName(_rsConfig.getReplSetName()); hbArgs.setConfigVersion(_rsConfig.getConfigVersion()); if (_selfIndex >= 0) { const MemberConfig& me = _selfConfig(); hbArgs.setSenderId(me.getId()); hbArgs.setSenderHost(me.getHostAndPort()); } hbArgs.setTerm(_term); } else { hbArgs.setSetName(ourSetName); // Config version -2 is for uninitialized config. hbArgs.setConfigVersion(-2); hbArgs.setTerm(OpTime::kInitialTerm); } const Milliseconds timeoutPeriod(_rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis() : ReplicaSetConfig::kDefaultHeartbeatTimeoutPeriod); const Milliseconds timeout(timeoutPeriod - alreadyElapsed); return std::make_pair(hbArgs, timeout); } HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse( Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, const StatusWith& hbResponse, const OpTime& myLastOpApplied) { const MemberState originalState = getMemberState(); PingStats& hbStats = _pings[target]; invariant(hbStats.getLastHeartbeatStartDate() != Date_t()); const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) || (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed); if (!hbResponse.isOK()) { if (isUnauthorized) { hbStats.hit(networkRoundTripTime); } else { hbStats.miss(); } } else { hbStats.hit(networkRoundTripTime); // Log diagnostics. if (hbResponse.getValue().isStateDisagreement()) { LOG(1) << target << " thinks that we are down because they cannot send us heartbeats."; } } // If a node is not PRIMARY and has no sync source, // we increase the heartbeat rate in order // to help it find a sync source more quickly, // which helps ensure the PRIMARY will continue to // see the majority of the cluster. // // Arbiters also use half the election timeout period for their heartbeat frequency Milliseconds heartbeatInterval; if (_rsConfig.getProtocolVersion() == 1 && (getMemberState().arbiter() || (getSyncSourceAddress().empty() && !_iAmPrimary()))) { heartbeatInterval = _rsConfig.getElectionTimeoutPeriod() / 2; } else { heartbeatInterval = _rsConfig.getHeartbeatInterval(); } const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); Date_t nextHeartbeatStartDate; // Determine next heartbeat start time. if (hbStats.getNumFailuresSinceLastStart() <= kMaxHeartbeatRetries && alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriod()) { // There are still retries left, let's use one. nextHeartbeatStartDate = now; } else { nextHeartbeatStartDate = now + heartbeatInterval; } if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) { const long long currentConfigVersion = _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2; const ReplicaSetConfig& newConfig = hbResponse.getValue().getConfig(); if (newConfig.getConfigVersion() > currentConfigVersion) { HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction(); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } else { // Could be we got the newer version before we got the response, or the // target erroneously sent us one, even through it isn't newer. if (newConfig.getConfigVersion() < currentConfigVersion) { LOG(1) << "Config version from heartbeat was older than ours."; } else { LOG(2) << "Config from heartbeat response was same as ours."; } if (logger::globalLogDomain()->shouldLog(MongoLogDefaultComponent_component, ::mongo::LogstreamBuilder::severityCast(2))) { LogstreamBuilder lsb = log(); if (_rsConfig.isInitialized()) { lsb << "Current config: " << _rsConfig.toBSON() << "; "; } lsb << "Config in heartbeat: " << newConfig.toBSON(); } } } // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do, // so return early. if (!_rsConfig.isInitialized()) { HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target); if (memberIndex == -1) { LOG(1) << "Could not find " << target << " in current config so ignoring --" " current config: " << _rsConfig.toBSON(); HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } invariant(memberIndex != _selfIndex); MemberHeartbeatData& hbData = _hbdata.at(memberIndex); const MemberConfig member = _rsConfig.getMemberAt(memberIndex); if (!hbResponse.isOK()) { if (isUnauthorized) { LOG(1) << "setAuthIssue: heartbeat response failed due to authentication" " issue for member _id:" << member.getId(); hbData.setAuthIssue(now); } else if (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries || alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriod()) { LOG(1) << "setDownValues: heartbeat response failed for member _id:" << member.getId() << ", msg: " << hbResponse.getStatus().reason(); hbData.setDownValues(now, hbResponse.getStatus().reason()); } else { LOG(3) << "Bad heartbeat response from " << target << "; trying again; Retries left: " << (kMaxHeartbeatRetries - hbStats.getNumFailuresSinceLastStart()) << "; " << alreadyElapsed << " have already elapsed"; } } else { ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue()); LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId() << ", msg: " << hbr.getHbMsg(); hbData.setUpValues(now, member.getHostAndPort(), std::move(hbr)); } HeartbeatResponseAction nextAction; if (_rsConfig.getProtocolVersion() == 0) { nextAction = _updatePrimaryFromHBData(memberIndex, originalState, now, myLastOpApplied); } else { nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now, myLastOpApplied); } nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } HeartbeatResponseAction TopologyCoordinatorImpl::setMemberAsDown(Date_t now, const int memberIndex, const OpTime& myLastOpApplied) { invariant(memberIndex != _selfIndex); invariant(memberIndex != -1); invariant(_currentPrimaryIndex == _selfIndex); MemberHeartbeatData& hbData = _hbdata.at(memberIndex); hbData.setDownValues(now, "no response within election timeout period"); if (CannotSeeMajority & _getMyUnelectableReason(now, myLastOpApplied)) { if (_stepDownPending) { return HeartbeatResponseAction::makeNoAction(); } _stepDownPending = true; log() << "can't see a majority of the set, relinquishing primary"; return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); } return HeartbeatResponseAction::makeNoAction(); } HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1( int updatedConfigIndex, const MemberState& originalState, Date_t now, const OpTime& lastOpApplied) { // // Updates the local notion of which remote node, if any is primary. // Start the priority takeover process if we are eligible. // invariant(updatedConfigIndex != _selfIndex); // If we are missing from the config, do not participate in primary maintenance or election. if (_selfIndex == -1) { return HeartbeatResponseAction::makeNoAction(); } // If we are the primary, there must be no other primary, otherwise its higher term would // have already made us step down. if (_currentPrimaryIndex == _selfIndex) { return HeartbeatResponseAction::makeNoAction(); } // Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex. int primaryIndex = -1; for (size_t i = 0; i < _hbdata.size(); i++) { const MemberHeartbeatData& member = _hbdata.at(i); if (member.getState().primary() && member.up()) { if (primaryIndex == -1 || _hbdata.at(primaryIndex).getTerm() < member.getTerm()) { primaryIndex = i; } } } _currentPrimaryIndex = primaryIndex; if (_currentPrimaryIndex == -1) { return HeartbeatResponseAction::makeNoAction(); } // Clear last heartbeat message on ourselves. setMyHeartbeatMessage(now, ""); // Priority takeover when the replset is stable. // // Take over the primary only if the remote primary is in the latest term I know. // This is done only when we get a heartbeat response from the primary. // Otherwise, there must be an outstanding election, which may succeed or not, but // the remote primary will become aware of that election eventually and step down. if (_hbdata.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex && _rsConfig.getMemberAt(primaryIndex).getPriority() < _rsConfig.getMemberAt(_selfIndex).getPriority()) { LOG(4) << "I can take over the primary due to higher priority." << " Current primary index: " << primaryIndex << " in term " << _hbdata.at(primaryIndex).getTerm(); return HeartbeatResponseAction::makePriorityTakeoverAction(); } return HeartbeatResponseAction::makeNoAction(); } HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData( int updatedConfigIndex, const MemberState& originalState, Date_t now, const OpTime& lastOpApplied) { // This method has two interrelated responsibilities, performed in two phases. // // First, it updates the local notion of which remote node, if any is primary. In the // process, it may request a remote primary to step down because there is a higher priority // node waiting, or because the local node thinks it is primary and that it has a more // recent electionTime. It may instead decide that the local node should step down itself, // because a remote has a more recent election time. // // Second, if there is no remote primary, and the local node is not primary, it considers // whether or not to stand for election. invariant(updatedConfigIndex != _selfIndex); // We are missing from the config, so do not participate in primary maintenance or election. if (_selfIndex == -1) { return HeartbeatResponseAction::makeNoAction(); } //////////////////// // Phase 1 //////////////////// // If we believe the node whose data was just updated is primary, confirm that // the updated data supports that notion. If not, erase our notion of who is primary. if (updatedConfigIndex == _currentPrimaryIndex) { const MemberHeartbeatData& updatedHBData = _hbdata.at(updatedConfigIndex); if (!updatedHBData.up() || !updatedHBData.getState().primary()) { _currentPrimaryIndex = -1; } } // If the current primary is not highest priority and up to date (within 10s), // have them/me stepdown. if (_currentPrimaryIndex != -1) { // check if we should ask the primary (possibly ourselves) to step down const int highestPriorityIndex = _getHighestPriorityElectableIndex(now, lastOpApplied); if (highestPriorityIndex != -1) { const MemberConfig& currentPrimaryMember = _rsConfig.getMemberAt(_currentPrimaryIndex); const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex); const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex ? lastOpApplied : _hbdata.at(highestPriorityIndex).getAppliedOpTime(); if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) && _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime, lastOpApplied)) { const OpTime latestOpTime = _latestKnownOpTime(lastOpApplied); if (_iAmPrimary()) { if (_stepDownPending) { return HeartbeatResponseAction::makeNoAction(); } _stepDownPending = true; log() << "Stepping down self (priority " << currentPrimaryMember.getPriority() << ") because " << highestPriorityMember.getHostAndPort() << " has higher priority " << highestPriorityMember.getPriority() << " and is only " << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs()) << " seconds behind me"; const Date_t until = now + VoteLease::leaseTime + _rsConfig.getHeartbeatInterval(); if (_electionSleepUntil < until) { _electionSleepUntil = until; } return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); } else if ((highestPriorityIndex == _selfIndex) && (_electionSleepUntil <= now)) { // If this node is the highest priority node, and it is not in // an inter-election sleep period, ask the current primary to step down. // This is an optimization, because the remote primary will almost certainly // notice this node's electability promptly, via its own heartbeat process. log() << "Requesting that " << currentPrimaryMember.getHostAndPort() << " (priority " << currentPrimaryMember.getPriority() << ") step down because I have higher priority " << highestPriorityMember.getPriority() << " and am only " << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs()) << " seconds behind it"; int primaryIndex = _currentPrimaryIndex; _currentPrimaryIndex = -1; return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex); } } } } // Scan the member list's heartbeat data for who is primary, and update // _currentPrimaryIndex and _role, or request a remote to step down, as necessary. { int remotePrimaryIndex = -1; for (std::vector::const_iterator it = _hbdata.begin(); it != _hbdata.end(); ++it) { const int itIndex = indexOfIterator(_hbdata, it); if (itIndex == _selfIndex) { continue; } if (it->getState().primary() && it->up()) { if (remotePrimaryIndex != -1) { // two other nodes think they are primary (asynchronously polled) // -- wait for things to settle down. warning() << "two remote primaries (transiently)"; return HeartbeatResponseAction::makeNoAction(); } remotePrimaryIndex = itIndex; } } if (remotePrimaryIndex != -1) { // If it's the same as last time, don't do anything further. if (_currentPrimaryIndex == remotePrimaryIndex) { return HeartbeatResponseAction::makeNoAction(); } // Clear last heartbeat message on ourselves (why?) setMyHeartbeatMessage(now, ""); // If we are also primary, this is a problem. Determine who should step down. if (_iAmPrimary()) { Timestamp remoteElectionTime = _hbdata.at(remotePrimaryIndex).getElectionTime(); log() << "another primary seen with election time " << remoteElectionTime << " my election time is " << _electionTime; // Step down whomever has the older election time. if (remoteElectionTime > _electionTime) { if (_stepDownPending) { return HeartbeatResponseAction::makeNoAction(); } _stepDownPending = true; log() << "stepping down; another primary was elected more recently"; return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); } else { log() << "another PRIMARY detected and it should step down" " since it was elected earlier than me"; return HeartbeatResponseAction::makeStepDownRemoteAction(remotePrimaryIndex); } } _currentPrimaryIndex = remotePrimaryIndex; return HeartbeatResponseAction::makeNoAction(); } } //////////////////// // Phase 2 //////////////////// // We do not believe any remote to be primary. // If we are primary, check if we can still see majority of the set; // stepdown if we can't. if (_iAmPrimary()) { if (CannotSeeMajority & _getMyUnelectableReason(now, lastOpApplied)) { if (_stepDownPending) { return HeartbeatResponseAction::makeNoAction(); } _stepDownPending = true; log() << "can't see a majority of the set, relinquishing primary"; return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); } LOG(2) << "Choosing to remain primary"; return HeartbeatResponseAction::makeNoAction(); } fassert(18505, _currentPrimaryIndex == -1); const MemberState currentState = getMemberState(); if (originalState.recovering() && currentState.secondary()) { // We just transitioned from RECOVERING to SECONDARY, this can only happen if we // received a heartbeat with an auth error when previously all the heartbeats we'd // received had auth errors. In this case, don't return makeElectAction() because // that could cause the election to start before the ReplicationCoordinator has updated // its notion of the member state to SECONDARY. Instead return noAction so that the // ReplicationCooridinator knows to update its tracking of the member state off of the // TopologyCoordinator, and leave starting the election until the next heartbeat comes // back. return HeartbeatResponseAction::makeNoAction(); } // At this point, there is no primary anywhere. Check to see if we should become a candidate. const auto status = checkShouldStandForElection(now, lastOpApplied); if (!status.isOK()) { // NOTE: This log line is checked in unit test(s). LOG(2) << "TopologyCoordinatorImpl::_updatePrimaryFromHBData - " << status.reason(); return HeartbeatResponseAction::makeNoAction(); } fassertStatusOK(28816, becomeCandidateIfElectable(now, lastOpApplied)); return HeartbeatResponseAction::makeElectAction(); } Status TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now, const OpTime& lastOpApplied) const { if (_currentPrimaryIndex != -1) { return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"}; } invariant(_role != Role::leader); if (_role == Role::candidate) { return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"}; } const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, lastOpApplied); if (NotCloseEnoughToLatestOptime & unelectableReason) { return {ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election because " << _getUnelectableReasonString(unelectableReason) << "; my last optime is " << lastOpApplied.toString() << " and the newest is " << _latestKnownOpTime(lastOpApplied).toString()}; } if (unelectableReason) { return {ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election because " << _getUnelectableReasonString(unelectableReason)}; } if (_electionSleepUntil > now) { if (_rsConfig.getProtocolVersion() == 1) { return { ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election before " << dateToISOStringLocal(_electionSleepUntil) << " because I stood up or learned about a new term too recently"}; } else { return {ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election before " << dateToISOStringLocal(_electionSleepUntil) << " because I stood too recently"}; } } // All checks passed. Start election proceedings. return Status::OK(); } bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const { int vUp = 0; for (std::vector::const_iterator it = _hbdata.begin(); it != _hbdata.end(); ++it) { const int itIndex = indexOfIterator(_hbdata, it); if (itIndex == _selfIndex || it->up()) { vUp += _rsConfig.getMemberAt(itIndex).getNumVotes(); } } return vUp * 2 > _rsConfig.getTotalVotingMembers(); } bool TopologyCoordinatorImpl::_isOpTimeCloseEnoughToLatestToElect( const OpTime& otherOpTime, const OpTime& ourLastOpApplied) const { const OpTime latestKnownOpTime = _latestKnownOpTime(ourLastOpApplied); // Use addition instead of subtraction to avoid overflow. return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs()); } bool TopologyCoordinatorImpl::_iAmPrimary() const { if (_role == Role::leader) { invariant(_currentPrimaryIndex == _selfIndex); return true; } return false; } OpTime TopologyCoordinatorImpl::_latestKnownOpTime(const OpTime& ourLastOpApplied) const { OpTime latest = ourLastOpApplied; for (std::vector::const_iterator it = _hbdata.begin(); it != _hbdata.end(); ++it) { if (indexOfIterator(_hbdata, it) == _selfIndex) { continue; } // Ignore down members if (!it->up()) { continue; } // Ignore removed nodes (not in config, so not valid). if (it->getState().removed()) { continue; } OpTime optime = it->getAppliedOpTime(); if (optime > latest) { latest = optime; } } return latest; } bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const { if (memberOneIndex == -1) return false; if (memberTwoIndex == -1) return true; return _rsConfig.getMemberAt(memberOneIndex).getPriority() > _rsConfig.getMemberAt(memberTwoIndex).getPriority(); } int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now, const OpTime& lastOpApplied) const { int maxIndex = -1; for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) { UnelectableReasonMask reason = currentIndex == _selfIndex ? _getMyUnelectableReason(now, lastOpApplied) : _getUnelectableReason(currentIndex, lastOpApplied); if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) { maxIndex = currentIndex; } } return maxIndex; } void TopologyCoordinatorImpl::prepareForStepDown() { _stepDownPending = true; } void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMemberState, const Timestamp& electionTime) { invariant(_selfIndex != -1); if (newMemberState == getMemberState()) return; switch (newMemberState.s) { case MemberState::RS_PRIMARY: _role = Role::candidate; processWinElection(OID(), electionTime); invariant(_role == Role::leader); break; case MemberState::RS_SECONDARY: case MemberState::RS_ROLLBACK: case MemberState::RS_RECOVERING: case MemberState::RS_STARTUP2: _role = Role::follower; _followerMode = newMemberState.s; if (_currentPrimaryIndex == _selfIndex) { _currentPrimaryIndex = -1; _stepDownPending = false; } break; case MemberState::RS_STARTUP: updateConfig(ReplicaSetConfig(), -1, Date_t(), OpTime()); break; default: severe() << "Cannot switch to state " << newMemberState; invariant(false); } if (getMemberState() != newMemberState.s) { severe() << "Expected to enter state " << newMemberState << " but am now in " << getMemberState(); invariant(false); } log() << newMemberState; } void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) { if (primaryIndex == _selfIndex) { changeMemberState_forTest(MemberState::RS_PRIMARY); } else { if (_iAmPrimary()) { changeMemberState_forTest(MemberState::RS_SECONDARY); } if (primaryIndex != -1) { ReplSetHeartbeatResponse hbResponse; hbResponse.setState(MemberState::RS_PRIMARY); hbResponse.setElectionTime(Timestamp()); hbResponse.setAppliedOpTime(_hbdata.at(primaryIndex).getAppliedOpTime()); hbResponse.setSyncingTo(HostAndPort()); hbResponse.setHbMsg(""); _hbdata.at(primaryIndex) .setUpValues(_hbdata.at(primaryIndex).getLastHeartbeat(), _rsConfig.getMemberAt(primaryIndex).getHostAndPort(), std::move(hbResponse)); } _currentPrimaryIndex = primaryIndex; } } const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const { if (_currentPrimaryIndex == -1) return NULL; return &(_rsConfig.getMemberAt(_currentPrimaryIndex)); } void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, BSONObjBuilder* response, Status* result) { // output for each member vector membersOut; const MemberState myState = getMemberState(); const Date_t now = rsStatusArgs.now; const OpTime& lastOpApplied = rsStatusArgs.lastOpApplied; const OpTime& lastOpDurable = rsStatusArgs.lastOpDurable; if (_selfIndex == -1) { // We're REMOVED or have an invalid config response->append("state", static_cast(myState.s)); response->append("stateStr", myState.toString()); response->append("uptime", rsStatusArgs.selfUptime); appendOpTime(response, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); response->appendDate("optimeDate", Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); if (_maintenanceModeCalls) { response->append("maintenanceMode", _maintenanceModeCalls); } std::string s = _getHbmsg(now); if (!s.empty()) response->append("infoMessage", s); *result = Status(ErrorCodes::InvalidReplicaSetConfig, "Our replica set config is invalid or we are not a member of it"); return; } for (std::vector::const_iterator it = _hbdata.begin(); it != _hbdata.end(); ++it) { const int itIndex = indexOfIterator(_hbdata, it); if (itIndex == _selfIndex) { // add self BSONObjBuilder bb; bb.append("_id", _selfConfig().getId()); bb.append("name", _selfConfig().getHostAndPort().toString()); bb.append("health", 1.0); bb.append("state", static_cast(myState.s)); bb.append("stateStr", myState.toString()); bb.append("uptime", rsStatusArgs.selfUptime); if (!_selfConfig().isArbiter()) { appendOpTime(&bb, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); bb.appendDate("optimeDate", Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); } if (!_syncSource.empty() && !_iAmPrimary()) { bb.append("syncingTo", _syncSource.toString()); } if (_maintenanceModeCalls) { bb.append("maintenanceMode", _maintenanceModeCalls); } std::string s = _getHbmsg(now); if (!s.empty()) bb.append("infoMessage", s); if (myState.primary()) { bb.append("electionTime", _electionTime); bb.appendDate("electionDate", Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs()))); } bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion()); bb.append("self", true); membersOut.push_back(bb.obj()); } else { // add non-self member const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex); BSONObjBuilder bb; bb.append("_id", itConfig.getId()); bb.append("name", itConfig.getHostAndPort().toString()); double h = it->getHealth(); bb.append("health", h); const MemberState state = it->getState(); bb.append("state", static_cast(state.s)); if (h == 0) { // if we can't connect the state info is from the past // and could be confusing to show bb.append("stateStr", "(not reachable/healthy)"); } else { bb.append("stateStr", it->getState().toString()); } const unsigned int uptime = static_cast(( it->getUpSince() != Date_t() ? durationCount(now - it->getUpSince()) : 0)); bb.append("uptime", uptime); if (!itConfig.isArbiter()) { appendOpTime(&bb, "optime", it->getAppliedOpTime(), _rsConfig.getProtocolVersion()); appendOpTime( &bb, "optimeDurable", it->getDurableOpTime(), _rsConfig.getProtocolVersion()); bb.appendDate( "optimeDate", Date_t::fromDurationSinceEpoch(Seconds(it->getAppliedOpTime().getSecs()))); bb.appendDate( "optimeDurableDate", Date_t::fromDurationSinceEpoch(Seconds(it->getDurableOpTime().getSecs()))); } bb.appendDate("lastHeartbeat", it->getLastHeartbeat()); bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv()); Milliseconds ping = _getPing(itConfig.getHostAndPort()); if (ping != UninitializedPing) { bb.append("pingMs", durationCount(ping)); std::string s = it->getLastHeartbeatMsg(); if (!s.empty()) bb.append("lastHeartbeatMessage", s); } if (it->hasAuthIssue()) { bb.append("authenticated", false); } const HostAndPort& syncSource = it->getSyncSource(); if (!syncSource.empty() && !state.primary()) { bb.append("syncingTo", syncSource.toString()); } if (state == MemberState::RS_PRIMARY) { bb.append("electionTime", it->getElectionTime()); bb.appendDate( "electionDate", Date_t::fromDurationSinceEpoch(Seconds(it->getElectionTime().getSecs()))); } bb.appendIntOrLL("configVersion", it->getConfigVersion()); membersOut.push_back(bb.obj()); } } // sort members bson sort(membersOut.begin(), membersOut.end()); response->append("set", _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : ""); response->append("date", now); response->append("myState", myState.s); response->append("term", _term); // Add sync source info if (!_syncSource.empty() && !myState.primary() && !myState.removed()) { response->append("syncingTo", _syncSource.toString()); } if (_rsConfig.isConfigServer()) { response->append("configsvr", true); } response->append("heartbeatIntervalMillis", durationCount(_rsConfig.getHeartbeatInterval())); // New optimes, to hold them all. BSONObjBuilder optimes; rsStatusArgs.lastCommittedOpTime.append(&optimes, "lastCommittedOpTime"); if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) { rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime"); } appendOpTime(&optimes, "appliedOpTime", lastOpApplied, _rsConfig.getProtocolVersion()); appendOpTime(&optimes, "durableOpTime", lastOpDurable, _rsConfig.getProtocolVersion()); response->append("optimes", optimes.obj()); response->append("members", membersOut); *result = Status::OK(); } void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) { const MemberState myState = getMemberState(); if (!_rsConfig.isInitialized()) { response->markAsNoConfig(); return; } for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); ++it) { if (it->isHidden() || it->getSlaveDelay() > Seconds{0}) { continue; } if (it->isElectable()) { response->addHost(it->getHostAndPort()); } else if (it->isArbiter()) { response->addArbiter(it->getHostAndPort()); } else { response->addPassive(it->getHostAndPort()); } } response->setReplSetName(_rsConfig.getReplSetName()); if (myState.removed()) { response->markAsNoConfig(); return; } response->setReplSetVersion(_rsConfig.getConfigVersion()); response->setIsMaster(myState.primary()); response->setIsSecondary(myState.secondary()); const MemberConfig* curPrimary = _currentPrimaryMember(); if (curPrimary) { response->setPrimary(curPrimary->getHostAndPort()); } const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); if (selfConfig.isArbiter()) { response->setIsArbiterOnly(true); } else if (selfConfig.getPriority() == 0) { response->setIsPassive(true); } if (selfConfig.getSlaveDelay() > Seconds(0)) { response->setSlaveDelay(selfConfig.getSlaveDelay()); } if (selfConfig.isHidden()) { response->setIsHidden(true); } if (!selfConfig.shouldBuildIndexes()) { response->setShouldBuildIndexes(false); } const ReplicaSetTagConfig tagConfig = _rsConfig.getTagConfig(); if (selfConfig.hasTags(tagConfig)) { for (MemberConfig::TagIterator tag = selfConfig.tagsBegin(); tag != selfConfig.tagsEnd(); ++tag) { std::string tagKey = tagConfig.getTagKey(*tag); if (tagKey[0] == '$') { // Filter out internal tags continue; } response->addTag(tagKey, tagConfig.getTagValue(*tag)); } } response->setMe(selfConfig.getHostAndPort()); if (_iAmPrimary()) { response->setElectionId(_electionId); } } void TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) { if (secs == 0) { _stepDownUntil = now; log() << "'unfreezing'"; response->append("info", "unfreezing"); if (_followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 && _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable()) { // If we are a one-node replica set, we're the one member, // we're electable, and we are currently in followerMode SECONDARY, // we must transition to candidate now that our stepdown period // is no longer active, in leiu of heartbeats. _role = Role::candidate; } } else { if (secs == 1) response->append("warning", "you really want to freeze for only 1 second?"); if (!_iAmPrimary()) { _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs)); log() << "'freezing' for " << secs << " seconds"; } else { log() << "received freeze command but we are primary"; } } } bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) { if (_stepDownUntil > now) { return false; } if (_followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 && _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable()) { // If the new config describes a one-node replica set, we're the one member, // we're electable, and we are currently in followerMode SECONDARY, // we must transition to candidate, in leiu of heartbeats. _role = Role::candidate; return true; } return false; } void TopologyCoordinatorImpl::setElectionSleepUntil(Date_t newTime) { if (_electionSleepUntil < newTime) { _electionSleepUntil = newTime; } } Timestamp TopologyCoordinatorImpl::getElectionTime() const { return _electionTime; } OID TopologyCoordinatorImpl::getElectionId() const { return _electionId; } int TopologyCoordinatorImpl::getCurrentPrimaryIndex() const { return _currentPrimaryIndex; } Date_t TopologyCoordinatorImpl::getStepDownTime() const { return _stepDownUntil; } void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplicaSetConfig& newConfig, int selfIndex, Date_t now) { std::vector oldHeartbeats; _hbdata.swap(oldHeartbeats); int index = 0; for (ReplicaSetConfig::MemberIterator it = newConfig.membersBegin(); it != newConfig.membersEnd(); ++it, ++index) { const MemberConfig& newMemberConfig = *it; // TODO: C++11: use emplace_back() if (index == selfIndex) { // Insert placeholder for ourself, though we will never consult it. _hbdata.push_back(MemberHeartbeatData()); } else { MemberHeartbeatData newHeartbeatData; for (int oldIndex = 0; oldIndex < _rsConfig.getNumMembers(); ++oldIndex) { const MemberConfig& oldMemberConfig = _rsConfig.getMemberAt(oldIndex); if (oldMemberConfig.getId() == newMemberConfig.getId() && oldMemberConfig.getHostAndPort() == newMemberConfig.getHostAndPort()) { // This member existed in the old config with the same member ID and // HostAndPort, so copy its heartbeat data over. newHeartbeatData = oldHeartbeats[oldIndex]; break; } } _hbdata.push_back(newHeartbeatData); } } } // This function installs a new config object and recreates MemberHeartbeatData objects // that reflect the new config. void TopologyCoordinatorImpl::updateConfig(const ReplicaSetConfig& newConfig, int selfIndex, Date_t now, const OpTime& lastOpApplied) { invariant(_role != Role::candidate); invariant(selfIndex < newConfig.getNumMembers()); // Reset term on startup and upgrade/downgrade of protocol version. if (!_rsConfig.isInitialized() || _rsConfig.getProtocolVersion() != newConfig.getProtocolVersion()) { if (newConfig.getProtocolVersion() == 1) { _term = OpTime::kInitialTerm; } else { invariant(newConfig.getProtocolVersion() == 0); _term = OpTime::kUninitializedTerm; } LOG(1) << "Updated term in topology coordinator to " << _term << " due to new config"; } _updateHeartbeatDataForReconfig(newConfig, selfIndex, now); _stepDownPending = false; _rsConfig = newConfig; _selfIndex = selfIndex; _forceSyncSourceIndex = -1; if (_role == Role::leader) { if (_selfIndex == -1) { log() << "Could not remain primary because no longer a member of the replica set"; } else if (!_selfConfig().isElectable()) { log() << " Could not remain primary because no longer electable"; } else { // Don't stepdown if you don't have to. _currentPrimaryIndex = _selfIndex; return; } _role = Role::follower; } // By this point we know we are in Role::follower _currentPrimaryIndex = -1; // force secondaries to re-detect who the primary is if (_followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 && _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable()) { // If the new config describes a one-node replica set, we're the one member, // we're electable, and we are currently in followerMode SECONDARY, // we must transition to candidate, in leiu of heartbeats. _role = Role::candidate; } } std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const { // ignore messages over 2 minutes old if ((now - _hbmsgTime) > Seconds{120}) { return ""; } return _hbmsg; } void TopologyCoordinatorImpl::setMyHeartbeatMessage(const Date_t now, const std::string& message) { _hbmsgTime = now; _hbmsg = message; } const MemberConfig& TopologyCoordinatorImpl::_selfConfig() const { return _rsConfig.getMemberAt(_selfIndex); } TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnelectableReason( int index, const OpTime& lastOpApplied) const { invariant(index != _selfIndex); const MemberConfig& memberConfig = _rsConfig.getMemberAt(index); const MemberHeartbeatData& hbData = _hbdata.at(index); UnelectableReasonMask result = None; if (memberConfig.isArbiter()) { result |= ArbiterIAm; } if (memberConfig.getPriority() <= 0) { result |= NoPriority; } if (hbData.getState() != MemberState::RS_SECONDARY) { result |= NotSecondary; } if (_rsConfig.getProtocolVersion() == 0 && !_isOpTimeCloseEnoughToLatestToElect(hbData.getAppliedOpTime(), lastOpApplied)) { result |= NotCloseEnoughToLatestOptime; } if (hbData.up() && hbData.isUnelectable()) { result |= RefusesToStand; } invariant(result || memberConfig.isElectable()); return result; } TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUnelectableReason( const Date_t now, const OpTime& lastApplied) const { UnelectableReasonMask result = None; if (lastApplied.isNull()) { result |= NoData; } if (!_aMajoritySeemsToBeUp()) { result |= CannotSeeMajority; } if (_selfIndex == -1) { result |= NotInitialized; return result; } if (_selfConfig().isArbiter()) { result |= ArbiterIAm; } if (_selfConfig().getPriority() <= 0) { result |= NoPriority; } if (_stepDownUntil > now) { result |= StepDownPeriodActive; } // Cannot be electable unless secondary or already primary if (!getMemberState().secondary() && !_iAmPrimary()) { result |= NotSecondary; } // Election rules only for protocol version 0. if (_rsConfig.getProtocolVersion() == 0) { if (_voteLease.whoId != -1 && _voteLease.whoId != _rsConfig.getMemberAt(_selfIndex).getId() && _voteLease.when + VoteLease::leaseTime >= now) { result |= VotedTooRecently; } if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied, lastApplied)) { result |= NotCloseEnoughToLatestOptime; } } return result; } std::string TopologyCoordinatorImpl::_getUnelectableReasonString( const UnelectableReasonMask ur) const { invariant(ur); str::stream ss; bool hasWrittenToStream = false; if (ur & NoData) { ss << "node has no applied oplog entries"; hasWrittenToStream = true; } if (ur & VotedTooRecently) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "I recently voted for " << _voteLease.whoHostAndPort.toString(); } if (ur & CannotSeeMajority) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "I cannot see a majority"; } if (ur & ArbiterIAm) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member is an arbiter"; } if (ur & NoPriority) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member has zero priority"; } if (ur & StepDownPeriodActive) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "I am still waiting for stepdown period to end at " << dateToISOStringLocal(_stepDownUntil); } if (ur & NotSecondary) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member is not currently a secondary"; } if (ur & NotCloseEnoughToLatestOptime) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member is more than 10 seconds behind the most up-to-date member"; } if (ur & NotInitialized) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "node is not a member of a valid replica set configuration"; } if (ur & RefusesToStand) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "most recent heartbeat indicates node will not stand for election"; } if (!hasWrittenToStream) { severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur); fassertFailed(26011); } ss << " (mask 0x" << integerToHex(ur) << ")"; return ss; } Milliseconds TopologyCoordinatorImpl::_getPing(const HostAndPort& host) { return _pings[host].getMillis(); } void TopologyCoordinatorImpl::_setElectionTime(const Timestamp& newElectionTime) { _electionTime = newElectionTime; } int TopologyCoordinatorImpl::_getTotalPings() { PingMap::iterator it = _pings.begin(); PingMap::iterator end = _pings.end(); int totalPings = 0; while (it != end) { totalPings += it->second.getCount(); it++; } return totalPings; } std::vector TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const { std::vector upHosts; for (std::vector::const_iterator it = _hbdata.begin(); it != _hbdata.end(); ++it) { const int itIndex = indexOfIterator(_hbdata, it); if (itIndex == _selfIndex) { continue; // skip ourselves } if (!it->maybeUp()) { continue; // skip DOWN nodes } upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort()); } return upHosts; } bool TopologyCoordinatorImpl::voteForMyself(Date_t now) { if (_role != Role::candidate) { return false; } int selfId = _selfConfig().getId(); if ((_voteLease.when + VoteLease::leaseTime >= now) && (_voteLease.whoId != selfId)) { log() << "not voting yea for " << selfId << " voted for " << _voteLease.whoHostAndPort.toString() << ' ' << durationCount(now - _voteLease.when) << " secs ago"; return false; } _voteLease.when = now; _voteLease.whoId = selfId; _voteLease.whoHostAndPort = _selfConfig().getHostAndPort(); return true; } MemberState TopologyCoordinatorImpl::getMemberState() const { if (_selfIndex == -1) { if (_rsConfig.isInitialized()) { return MemberState::RS_REMOVED; } return MemberState::RS_STARTUP; } if (_rsConfig.isConfigServer()) { if (_options.configServerMode == CatalogManager::ConfigServerMode::NONE) { return MemberState::RS_REMOVED; } if (_options.configServerMode == CatalogManager::ConfigServerMode::CSRS) { invariant(_storageEngineSupportsReadCommitted != ReadCommittedSupport::kUnknown); if (_storageEngineSupportsReadCommitted == ReadCommittedSupport::kNo) { return MemberState::RS_REMOVED; } } } else { if (_options.configServerMode != CatalogManager::ConfigServerMode::NONE) { return MemberState::RS_REMOVED; } } if (_role == Role::leader) { invariant(_currentPrimaryIndex == _selfIndex); return MemberState::RS_PRIMARY; } const MemberConfig& myConfig = _selfConfig(); if (myConfig.isArbiter()) { return MemberState::RS_ARBITER; } if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_hbdata, _selfIndex))) && (_followerMode == MemberState::RS_SECONDARY)) { return MemberState::RS_RECOVERING; } return _followerMode; } void TopologyCoordinatorImpl::setElectionInfo(OID electionId, Timestamp electionOpTime) { invariant(_role == Role::leader); _electionTime = electionOpTime; _electionId = electionId; } void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp electionOpTime) { invariant(_role == Role::candidate); _role = Role::leader; setElectionInfo(electionId, electionOpTime); _currentPrimaryIndex = _selfIndex; _syncSource = HostAndPort(); _forceSyncSourceIndex = -1; } void TopologyCoordinatorImpl::processLoseElection() { invariant(_role == Role::candidate); const HostAndPort syncSourceAddress = getSyncSourceAddress(); _electionTime = Timestamp(0, 0); _electionId = OID(); _role = Role::follower; // Clear voteLease time, if we voted for ourselves in this election. // This will allow us to vote for others. if (_voteLease.whoId == _selfConfig().getId()) { _voteLease.when = Date_t(); } } bool TopologyCoordinatorImpl::stepDown(Date_t until, bool force, const OpTime& lastOpApplied) { bool canStepDown = force; for (int i = 0; !canStepDown && i < _rsConfig.getNumMembers(); ++i) { if (i == _selfIndex) { continue; } UnelectableReasonMask reason = _getUnelectableReason(i, lastOpApplied); if (!reason && _hbdata.at(i).getAppliedOpTime() >= lastOpApplied) { canStepDown = true; } } if (!canStepDown) { return false; } _stepDownUntil = until; _stepDownSelfAndReplaceWith(-1); return true; } void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) { invariant(_role == Role::follower); switch (newMode) { case MemberState::RS_RECOVERING: case MemberState::RS_ROLLBACK: case MemberState::RS_SECONDARY: case MemberState::RS_STARTUP2: _followerMode = newMode; break; default: invariant(false); } if (_followerMode != MemberState::RS_SECONDARY) { return; } // When a single node replica set transitions to SECONDARY, we must check if we should // be a candidate here. This is necessary because a single node replica set has no // heartbeats that would normally change the role to candidate. if (_rsConfig.getNumMembers() == 1 && _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable()) { _role = Role::candidate; } } bool TopologyCoordinatorImpl::stepDownIfPending() { if (!_stepDownPending) { return false; } int remotePrimaryIndex = -1; for (std::vector::const_iterator it = _hbdata.begin(); it != _hbdata.end(); ++it) { const int itIndex = indexOfIterator(_hbdata, it); if (itIndex == _selfIndex) { continue; } if (it->getState().primary() && it->up()) { if (remotePrimaryIndex != -1) { // two other nodes think they are primary (asynchronously polled) // -- wait for things to settle down. remotePrimaryIndex = -1; warning() << "two remote primaries (transiently)"; break; } remotePrimaryIndex = itIndex; } } _stepDownSelfAndReplaceWith(remotePrimaryIndex); return true; } void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) { invariant(_role == Role::leader); invariant(_selfIndex != -1); invariant(_selfIndex != newPrimary); invariant(_selfIndex == _currentPrimaryIndex); _currentPrimaryIndex = newPrimary; _role = Role::follower; _stepDownPending = false; } void TopologyCoordinatorImpl::adjustMaintenanceCountBy(int inc) { invariant(_role == Role::follower); _maintenanceModeCalls += inc; invariant(_maintenanceModeCalls >= 0); } int TopologyCoordinatorImpl::getMaintenanceCount() const { return _maintenanceModeCalls; } TopologyCoordinator::UpdateTermResult TopologyCoordinatorImpl::updateTerm(long long term, Date_t now) { if (term <= _term) { return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate; } // Don't run election if we just stood up or learned about a new term. _electionSleepUntil = now + _rsConfig.getElectionTimeoutPeriod(); // Don't update the term just yet if we are going to step down, as we don't want to report // that we are primary in the new term. if (_iAmPrimary()) { return TopologyCoordinator::UpdateTermResult::kTriggerStepDown; } _term = term; return TopologyCoordinator::UpdateTermResult::kUpdatedTerm; } long long TopologyCoordinatorImpl::getTerm() { return _term; } bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, const OpTime& syncSourceLastOpTime, bool syncSourceHasSyncSource, Date_t now) const { // Methodology: // If there exists a viable sync source member other than currentSource, whose oplog has // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's, // return true. // If the currentSource has the same replication progress as we do and has no source for further // progress, return true. // If the user requested a sync source change, return true. if (_forceSyncSourceIndex != -1) { return true; } const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource); if (currentSourceIndex == -1) { return true; } invariant(currentSourceIndex != _selfIndex); OpTime currentSourceOpTime = std::max(syncSourceLastOpTime, _hbdata.at(currentSourceIndex).getAppliedOpTime()); if (currentSourceOpTime.isNull()) { // Haven't received a heartbeat from the sync source yet, so can't tell if we should // change. return false; } if (_rsConfig.getProtocolVersion() == 1 && !syncSourceHasSyncSource && currentSourceOpTime <= myLastOpTime && _hbdata.at(currentSourceIndex).getState() != MemberState::RS_PRIMARY) { return true; } unsigned int currentSecs = currentSourceOpTime.getSecs(); unsigned int goalSecs = currentSecs + durationCount(_options.maxSyncSourceLagSecs); for (std::vector::const_iterator it = _hbdata.begin(); it != _hbdata.end(); ++it) { const int itIndex = indexOfIterator(_hbdata, it); const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex); if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) && (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) && it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) && goalSecs < it->getAppliedOpTime().getSecs()) { log() << "re-evaluating sync source because our current sync source's most recent " << "OpTime is " << currentSourceOpTime.toString() << " which is more than " << _options.maxSyncSourceLagSecs << " behind member " << candidateConfig.getHostAndPort().toString() << " whose most recent OpTime is " << it->getAppliedOpTime().toString(); invariant(itIndex != _selfIndex); return true; } } return false; } void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, const OpTime& lastVisibleOpTime, const OpTime& lastCommittedOpTime) const { *metadata = rpc::ReplSetMetadata(_term, lastCommittedOpTime, lastVisibleOpTime, _rsConfig.getConfigVersion(), _rsConfig.getReplicaSetId(), _currentPrimaryIndex, _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); } void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { output->setConfig(_rsConfig); output->setHBData(_hbdata); output->setSelfIndex(_selfIndex); output->setPrimaryIndex(_currentPrimaryIndex); output->setSelfState(getMemberState()); output->setSelfHeartbeatMessage(_hbmsg); } void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response, const OpTime& lastAppliedOpTime) { response->setTerm(_term); if (args.getTerm() < _term) { response->setVoteGranted(false); response->setReason("candidate's term is lower than mine"); } else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) { response->setVoteGranted(false); response->setReason("candidate's config version differs from mine"); } else if (args.getSetName() != _rsConfig.getReplSetName()) { response->setVoteGranted(false); response->setReason("candidate's set name differs from mine"); } else if (args.getLastDurableOpTime() < lastAppliedOpTime) { response->setVoteGranted(false); response->setReason("candidate's data is staler than mine"); } else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) { response->setVoteGranted(false); response->setReason("already voted for another candidate this term"); } else { if (!args.isADryRun()) { _lastVote.setTerm(args.getTerm()); _lastVote.setCandidateIndex(args.getCandidateIndex()); } response->setVoteGranted(true); } } void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) { _lastVote = lastVote; } void TopologyCoordinatorImpl::voteForMyselfV1() { _lastVote.setTerm(_term); _lastVote.setCandidateIndex(_selfIndex); } void TopologyCoordinatorImpl::setPrimaryIndex(long long primaryIndex) { _currentPrimaryIndex = primaryIndex; } Status TopologyCoordinatorImpl::becomeCandidateIfElectable(const Date_t now, const OpTime& lastOpApplied) { if (_role == Role::leader) { return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"}; } if (_role == Role::candidate) { return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"}; } const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, lastOpApplied); if (unelectableReason) { return {ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election because " << _getUnelectableReasonString(unelectableReason)}; } // All checks passed, become a candidate and start election proceedings. _role = Role::candidate; return Status::OK(); } void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool supported) { _storageEngineSupportsReadCommitted = supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; } } // namespace repl } // namespace mongo