/** * Copyright 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/topology_coordinator.h" #include #include #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/audit.h" #include "mongo/db/client.h" #include "mongo/db/mongod_options.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/heartbeat_response_action.h" #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/member_data.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/repl_set_html_summary.h" #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/server_parameters.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/hex.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace repl { const Seconds TopologyCoordinator::VoteLease::leaseTime = Seconds(30); // Controls how caught up in replication a secondary with higher priority than the current primary // must be before it will call for a priority takeover election. MONGO_EXPORT_STARTUP_SERVER_PARAMETER(priorityTakeoverFreshnessWindowSeconds, int, 2); // If this fail point is enabled, TopologyCoordinator::shouldChangeSyncSource() will ignore // the option TopologyCoordinator::Options::maxSyncSourceLagSecs. The sync source will not be // re-evaluated if it lags behind another node by more than 'maxSyncSourceLagSecs' seconds. MONGO_FP_DECLARE(disableMaxSyncSourceLagSecs); constexpr Milliseconds TopologyCoordinator::PingStats::UninitializedPingTime; std::string TopologyCoordinator::roleToString(TopologyCoordinator::Role role) { switch (role) { case TopologyCoordinator::Role::kLeader: return "leader"; case TopologyCoordinator::Role::kFollower: return "follower"; case TopologyCoordinator::Role::kCandidate: return "candidate"; } invariant(false); } TopologyCoordinator::~TopologyCoordinator() {} std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role) { return os << TopologyCoordinator::roleToString(role); } std::ostream& operator<<(std::ostream& os, TopologyCoordinator::PrepareFreezeResponseResult result) { switch (result) { case TopologyCoordinator::PrepareFreezeResponseResult::kNoAction: return os << "no action"; case TopologyCoordinator::PrepareFreezeResponseResult::kElectSelf: return os << "elect self"; } MONGO_UNREACHABLE; } namespace { template int indexOfIterator(const std::vector& vec, typename std::vector::const_iterator& it) { return static_cast(it - vec.begin()); } /** * Returns true if the only up heartbeats are auth errors. */ bool _hasOnlyAuthErrorUpHeartbeats(const std::vector& hbdata, const int selfIndex) { bool foundAuthError = false; for (std::vector::const_iterator it = hbdata.begin(); it != hbdata.end(); ++it) { if (indexOfIterator(hbdata, it) == selfIndex) { continue; } if (it->up()) { return false; } if (it->hasAuthIssue()) { foundAuthError = true; } } return foundAuthError; } void appendOpTime(BSONObjBuilder* bob, const char* elemName, const OpTime& opTime, const long long pv) { if (pv == 1) { opTime.append(bob, elemName); } else { bob->append(elemName, opTime.getTimestamp()); } } } // namespace void TopologyCoordinator::PingStats::start(Date_t now) { _lastHeartbeatStartDate = now; _numFailuresSinceLastStart = 0; _state = HeartbeatState::TRYING; } void TopologyCoordinator::PingStats::hit(Milliseconds millis) { _state = HeartbeatState::SUCCEEDED; ++hitCount; averagePingTimeMs = averagePingTimeMs == UninitializedPingTime ? millis : Milliseconds((averagePingTimeMs * 4 + millis) / 5); } void TopologyCoordinator::PingStats::miss() { ++_numFailuresSinceLastStart; // Transition to 'FAILED' state if this was our last retry. if (_numFailuresSinceLastStart > kMaxHeartbeatRetries) { _state = PingStats::HeartbeatState::FAILED; } } TopologyCoordinator::TopologyCoordinator(Options options) : _role(Role::kFollower), _term(OpTime::kUninitializedTerm), _currentPrimaryIndex(-1), _forceSyncSourceIndex(-1), _options(std::move(options)), _selfIndex(-1), _maintenanceModeCalls(0), _followerMode(MemberState::RS_STARTUP2) { invariant(getMemberState() == MemberState::RS_STARTUP); // Need an entry for self in the memberHearbeatData. _memberData.emplace_back(); _memberData.back().setIsSelf(true); } TopologyCoordinator::Role TopologyCoordinator::getRole() const { return _role; } void TopologyCoordinator::setForceSyncSourceIndex(int index) { invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); _forceSyncSourceIndex = index; } HostAndPort TopologyCoordinator::getSyncSourceAddress() const { return _syncSource; } HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, const OpTime& lastOpTimeFetched, ChainingPreference chainingPreference) { // If we are not a member of the current replica set configuration, no sync source is valid. if (_selfIndex == -1) { LOG(1) << "Cannot sync from any members because we are not in the replica set config"; return HostAndPort(); } // if we have a target we've requested to sync from, use it if (_forceSyncSourceIndex != -1) { invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort(); _forceSyncSourceIndex = -1; log() << "choosing sync source candidate by request: " << _syncSource; std::string msg(str::stream() << "syncing from: " << _syncSource.toString() << " by request"); setMyHeartbeatMessage(now, msg); return _syncSource; } // wait for 2N pings (not counting ourselves) before choosing a sync target int needMorePings = (_memberData.size() - 1) * 2 - _getTotalPings(); if (needMorePings > 0) { OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing"; _syncSource = HostAndPort(); return _syncSource; } // If we are only allowed to sync from the primary, set that if (chainingPreference == ChainingPreference::kUseConfiguration && !_rsConfig.isChainingAllowed()) { if (_currentPrimaryIndex == -1) { LOG(1) << "Cannot select a sync source because chaining is" " not allowed and primary is unknown/down"; _syncSource = HostAndPort(); return _syncSource; } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) { LOG(1) << "Cannot select a sync source because chaining is not allowed and primary " "member is blacklisted: " << _currentPrimaryMember()->getHostAndPort(); _syncSource = HostAndPort(); return _syncSource; } else if (_currentPrimaryIndex == _selfIndex) { LOG(1) << "Cannot select a sync source because chaining is not allowed and we are primary"; _syncSource = HostAndPort(); return _syncSource; } else { _syncSource = _currentPrimaryMember()->getHostAndPort(); log() << "chaining not allowed, choosing primary as sync source candidate: " << _syncSource; std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString()); setMyHeartbeatMessage(now, msg); return _syncSource; } } // find the member with the lowest ping time that is ahead of me // choose a time that will exclude no candidates by default, in case we don't see a primary OpTime oldestSyncOpTime; // Find primary's oplog time. Reject sync candidates that are more than // _options.maxSyncSourceLagSecs seconds behind. if (_currentPrimaryIndex != -1) { OpTime primaryOpTime = _memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime(); // Check if primaryOpTime is still close to 0 because we haven't received // our first heartbeat from a new primary yet. unsigned int maxLag = static_cast(durationCount(_options.maxSyncSourceLagSecs)); if (primaryOpTime.getSecs() >= maxLag) { oldestSyncOpTime = OpTime(Timestamp(primaryOpTime.getSecs() - maxLag, 0), primaryOpTime.getTerm()); } } int closestIndex = -1; // Make two attempts, with less restrictive rules the second time. // // During the first attempt, we ignore those nodes that have a larger slave // delay, hidden nodes or non-voting, and nodes that are excessively behind. // // For the second attempt include those nodes, in case those are the only ones we can reach. // // This loop attempts to set 'closestIndex', to select a viable candidate. for (int attempts = 0; attempts < 2; ++attempts) { for (std::vector::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { const int itIndex = indexOfIterator(_memberData, it); // Don't consider ourselves. if (itIndex == _selfIndex) { continue; } const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex)); // Candidate must be up to be considered. if (!it->up()) { LOG(2) << "Cannot select sync source because it is not up: " << itMemberConfig.getHostAndPort(); continue; } // Candidate must be PRIMARY or SECONDARY state to be considered. if (!it->getState().readable()) { LOG(2) << "Cannot select sync source because it is not readable: " << itMemberConfig.getHostAndPort(); continue; } // On the first attempt, we skip candidates that do not match these criteria. if (attempts == 0) { // Candidate must be a voter if we are a voter. if (_selfConfig().isVoter() && !itMemberConfig.isVoter()) { LOG(2) << "Cannot select sync source because we are a voter and it is not: " << itMemberConfig.getHostAndPort(); continue; } // Candidates must not be hidden. if (itMemberConfig.isHidden()) { LOG(2) << "Cannot select sync source because it is hidden: " << itMemberConfig.getHostAndPort(); continue; } // Candidates cannot be excessively behind. if (it->getHeartbeatAppliedOpTime() < oldestSyncOpTime) { LOG(2) << "Cannot select sync source because it is too far behind." << "Latest optime of sync candidate " << itMemberConfig.getHostAndPort() << ": " << it->getHeartbeatAppliedOpTime() << ", oldest acceptable optime: " << oldestSyncOpTime; continue; } // Candidate must not have a configured delay larger than ours. if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay()) { LOG(2) << "Cannot select sync source with larger slaveDelay than ours: " << itMemberConfig.getHostAndPort(); continue; } } // Candidate must build indexes if we build indexes, to be considered. if (_selfConfig().shouldBuildIndexes()) { if (!itMemberConfig.shouldBuildIndexes()) { LOG(2) << "Cannot select sync source with shouldBuildIndex differences: " << itMemberConfig.getHostAndPort(); continue; } } // only consider candidates that are ahead of where we are if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) { LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. " << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON() << ", latest oplog optime of sync candidate " << itMemberConfig.getHostAndPort() << ": " << it->getHeartbeatAppliedOpTime().toBSON(); continue; } // Candidate cannot be more latent than anything we've already considered. if ((closestIndex != -1) && (_getPing(itMemberConfig.getHostAndPort()) > _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) { LOG(2) << "Cannot select sync source with higher latency than the best candidate: " << itMemberConfig.getHostAndPort(); continue; } // Candidate cannot be blacklisted. if (_memberIsBlacklisted(itMemberConfig, now)) { LOG(1) << "Cannot select sync source which is blacklisted: " << itMemberConfig.getHostAndPort(); continue; } // This candidate has passed all tests; set 'closestIndex' closestIndex = itIndex; } if (closestIndex != -1) break; // no need for second attempt } if (closestIndex == -1) { // Did not find any members to sync from std::string msg("could not find member to sync from"); // Only log when we had a valid sync source before if (!_syncSource.empty()) { log() << msg << rsLog; } setMyHeartbeatMessage(now, msg); _syncSource = HostAndPort(); return _syncSource; } _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort(); log() << "sync source candidate: " << _syncSource; std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0); setMyHeartbeatMessage(now, msg); return _syncSource; } bool TopologyCoordinator::_memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const { std::map::const_iterator blacklisted = _syncSourceBlacklist.find(memberConfig.getHostAndPort()); if (blacklisted != _syncSourceBlacklist.end()) { if (blacklisted->second > now) { return true; } } return false; } void TopologyCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) { LOG(2) << "blacklisting " << host << " until " << until.toString(); _syncSourceBlacklist[host] = until; } void TopologyCoordinator::unblacklistSyncSource(const HostAndPort& host, Date_t now) { std::map::iterator hostItr = _syncSourceBlacklist.find(host); if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) { LOG(2) << "unblacklisting " << host; _syncSourceBlacklist.erase(hostItr); } } void TopologyCoordinator::clearSyncSourceBlacklist() { _syncSourceBlacklist.clear(); } void TopologyCoordinator::prepareSyncFromResponse(const HostAndPort& target, BSONObjBuilder* response, Status* result) { response->append("syncFromRequested", target.toString()); if (_selfIndex == -1) { *result = Status(ErrorCodes::NotSecondary, "Removed and uninitialized nodes do not sync"); return; } const MemberConfig& selfConfig = _selfConfig(); if (selfConfig.isArbiter()) { *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync"); return; } if (_selfIndex == _currentPrimaryIndex) { *result = Status(ErrorCodes::NotSecondary, "primaries don't sync"); return; } ReplSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd(); int targetIndex = 0; for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); ++it) { if (it->getHostAndPort() == target) { targetConfig = it; break; } ++targetIndex; } if (targetConfig == _rsConfig.membersEnd()) { *result = Status(ErrorCodes::NodeNotFound, str::stream() << "Could not find member \"" << target.toString() << "\" in replica set"); return; } if (targetIndex == _selfIndex) { *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself"); return; } if (targetConfig->isArbiter()) { *result = Status(ErrorCodes::InvalidOptions, str::stream() << "Cannot sync from \"" << target.toString() << "\" because it is an arbiter"); return; } if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) { *result = Status(ErrorCodes::InvalidOptions, str::stream() << "Cannot sync from \"" << target.toString() << "\" because it does not build indexes"); return; } if (selfConfig.isVoter() && !targetConfig->isVoter()) { *result = Status(ErrorCodes::InvalidOptions, str::stream() << "Cannot sync from \"" << target.toString() << "\" because it is not a voter"); return; } const MemberData& hbdata = _memberData.at(targetIndex); if (hbdata.hasAuthIssue()) { *result = Status(ErrorCodes::Unauthorized, str::stream() << "not authorized to communicate with " << target.toString()); return; } if (hbdata.getHealth() == 0) { *result = Status(ErrorCodes::HostUnreachable, str::stream() << "I cannot reach the requested member: " << target.toString()); return; } const OpTime lastOpApplied = getMyLastAppliedOpTime(); if (hbdata.getHeartbeatAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) { warning() << "attempting to sync from " << target << ", but its latest opTime is " << hbdata.getHeartbeatAppliedOpTime().getSecs() << " and ours is " << lastOpApplied.getSecs() << " so this may not work"; response->append("warning", str::stream() << "requested member \"" << target.toString() << "\" is more than 10 seconds behind us"); // not returning bad Status, just warning } HostAndPort prevSyncSource = getSyncSourceAddress(); if (!prevSyncSource.empty()) { response->append("prevSyncTarget", prevSyncSource.toString()); } setForceSyncSourceIndex(targetIndex); *result = Status::OK(); } void TopologyCoordinator::prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args, const Date_t now, BSONObjBuilder* response, Status* result) { if (_rsConfig.getProtocolVersion() != 0) { *result = Status(ErrorCodes::BadValue, str::stream() << "replset: incompatible replset protocol version: " << _rsConfig.getProtocolVersion()); return; } if (_selfIndex == -1) { *result = Status(ErrorCodes::ReplicaSetNotFound, "Cannot participate in elections because not initialized"); return; } if (args.setName != _rsConfig.getReplSetName()) { *result = Status(ErrorCodes::ReplicaSetNotFound, str::stream() << "Wrong repl set name. Expected: " << _rsConfig.getReplSetName() << ", received: " << args.setName); return; } if (args.id == static_cast(_selfConfig().getId())) { *result = Status(ErrorCodes::BadValue, str::stream() << "Received replSetFresh command from member with the " "same member ID as ourself: " << args.id); return; } bool weAreFresher = false; const OpTime lastOpApplied = getMyLastAppliedOpTime(); if (_rsConfig.getConfigVersion() > args.cfgver) { log() << "replSet member " << args.who << " is not yet aware its cfg version " << args.cfgver << " is stale"; response->append("info", "config version stale"); weAreFresher = true; } // check not only our own optime, but any other member we can reach else if (OpTime(args.opTime, _term) < _latestKnownOpTime()) { weAreFresher = true; } response->appendDate("opTime", Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL())); response->append("fresher", weAreFresher); std::string errmsg; bool doVeto = _shouldVetoMember(args, now, &errmsg); response->append("veto", doVeto); if (doVeto) { response->append("errmsg", errmsg); } *result = Status::OK(); } bool TopologyCoordinator::_shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args, const Date_t& now, std::string* errmsg) const { if (_rsConfig.getConfigVersion() < args.cfgver) { // We are stale; do not veto. return false; } const unsigned int memberID = args.id; const int hopefulIndex = _getMemberIndex(memberID); invariant(hopefulIndex != _selfIndex); const int highestPriorityIndex = _getHighestPriorityElectableIndex(now); if (hopefulIndex == -1) { *errmsg = str::stream() << "replSet couldn't find member with id " << memberID; return true; } const OpTime lastOpApplied = getMyLastAppliedOpTime(); if (_iAmPrimary() && lastOpApplied >= _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime()) { // hbinfo is not updated for ourself, so if we are primary we have to check the // primary's last optime separately *errmsg = str::stream() << "I am already primary, " << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " can try again once I've stepped down"; return true; } if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) && (_memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime() >= _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime())) { // other members might be aware of more up-to-date nodes *errmsg = str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " is trying to elect itself but " << _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString() << " is already primary and more up-to-date"; return true; } if ((highestPriorityIndex != -1)) { const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex); const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex); if (priorityMember.getPriority() > hopefulMember.getPriority()) { *errmsg = str::stream() << hopefulMember.getHostAndPort().toString() << " has lower priority of " << hopefulMember.getPriority() << " than " << priorityMember.getHostAndPort().toString() << " which has a priority of " << priorityMember.getPriority(); return true; } } UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex); reason &= ~RefusesToStand; if (reason) { *errmsg = str::stream() << "I don't think " << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " is electable because the " << _getUnelectableReasonString(reason); return true; } return false; } // produce a reply to a received electCmd void TopologyCoordinator::prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args, const Date_t now, BSONObjBuilder* response, Status* result) { if (_rsConfig.getProtocolVersion() != 0) { *result = Status(ErrorCodes::BadValue, str::stream() << "replset: incompatible replset protocol version: " << _rsConfig.getProtocolVersion()); return; } if (_selfIndex == -1) { *result = Status(ErrorCodes::ReplicaSetNotFound, "Cannot participate in election because not initialized"); return; } const long long myver = _rsConfig.getConfigVersion(); const int highestPriorityIndex = _getHighestPriorityElectableIndex(now); const MemberConfig* primary = _currentPrimaryMember(); const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid); const MemberConfig* highestPriority = highestPriorityIndex == -1 ? NULL : &_rsConfig.getMemberAt(highestPriorityIndex); int vote = 0; if (args.set != _rsConfig.getReplSetName()) { log() << "replSet error received an elect request for '" << args.set << "' but our set name is '" << _rsConfig.getReplSetName() << "'"; } else if (myver < args.cfgver) { // we are stale. don't vote log() << "replSetElect not voting because our config version is stale. Our version: " << myver << ", their version: " << args.cfgver; } else if (myver > args.cfgver) { // they are stale! log() << "replSetElect command received stale config version # during election. " "Our version: " << myver << ", their version: " << args.cfgver; vote = -10000; } else if (!hopeful) { log() << "replSetElect couldn't find member with id " << args.whoid; vote = -10000; } else if (_iAmPrimary()) { log() << "I am already primary, " << hopeful->getHostAndPort().toString() << " can try again once I've stepped down"; vote = -10000; } else if (primary) { log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but " << primary->getHostAndPort().toString() << " is already primary"; vote = -10000; } else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) { // TODO(spencer): What if the lower-priority member is more up-to-date? log() << hopeful->getHostAndPort().toString() << " has lower priority than " << highestPriority->getHostAndPort().toString(); vote = -10000; } else if (_voteLease.when + VoteLease::leaseTime >= now && _voteLease.whoId != args.whoid) { log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for " << _voteLease.whoHostAndPort.toString() << ' ' << durationCount(now - _voteLease.when) << " secs ago"; } else { _voteLease.when = now; _voteLease.whoId = args.whoid; _voteLease.whoHostAndPort = hopeful->getHostAndPort(); vote = _selfConfig().getNumVotes(); invariant(hopeful->getId() == args.whoid); if (vote > 0) { log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " (" << args.whoid << ')'; } } response->append("vote", vote); response->append("round", args.round); *result = Status::OK(); } // produce a reply to a heartbeat Status TopologyCoordinator::prepareHeartbeatResponse(Date_t now, const ReplSetHeartbeatArgs& args, const std::string& ourSetName, ReplSetHeartbeatResponse* response) { if (args.getProtocolVersion() != 1) { return Status(ErrorCodes::BadValue, str::stream() << "replset: incompatible replset protocol version: " << args.getProtocolVersion()); } // Verify that replica set names match const std::string rshb = args.getSetName(); if (ourSetName != rshb) { log() << "replSet set names do not match, ours: " << ourSetName << "; remote node's: " << rshb; response->noteMismatched(); return Status(ErrorCodes::InconsistentReplicaSetNames, str::stream() << "Our set name of " << ourSetName << " does not match name " << rshb << " reported by remote node"); } const MemberState myState = getMemberState(); if (_selfIndex == -1) { if (myState.removed()) { return Status(ErrorCodes::InvalidReplicaSetConfig, "Our replica set configuration is invalid or does not include us"); } } else { invariant(_rsConfig.getReplSetName() == args.getSetName()); if (args.getSenderId() == _selfConfig().getId()) { return Status(ErrorCodes::BadValue, str::stream() << "Received heartbeat from member with the same " "member ID as ourself: " << args.getSenderId()); } } // This is a replica set response->noteReplSet(); response->setSetName(ourSetName); response->setState(myState.s); if (myState.primary()) { response->setElectionTime(_electionTime); } const OpTime lastOpApplied = getMyLastAppliedOpTime(); const OpTime lastOpDurable = getMyLastDurableOpTime(); // Are we electable response->setElectable(!_getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)); // Heartbeat status message response->setHbMsg(_getHbmsg(now)); response->setTime(duration_cast(now - Date_t{})); response->setAppliedOpTime(lastOpApplied); response->setDurableOpTime(lastOpDurable); if (!_syncSource.empty()) { response->setSyncingTo(_syncSource); } if (!_rsConfig.isInitialized()) { response->setConfigVersion(-2); return Status::OK(); } const long long v = _rsConfig.getConfigVersion(); response->setConfigVersion(v); // Deliver new config if caller's version is older than ours if (v > args.getConfigVersion()) { response->setConfig(_rsConfig); } // Resolve the caller's id in our Member list int from = -1; if (v == args.getConfigVersion() && args.getSenderId() != -1) { from = _getMemberIndex(args.getSenderId()); } if (from == -1) { // Can't find the member, so we leave out the stateDisagreement field return Status::OK(); } invariant(from != _selfIndex); auto& fromNodeData = _memberData.at(from); // if we thought that this node is down, let it know if (!fromNodeData.up()) { response->noteStateDisagreement(); } // note that we got a heartbeat from this node fromNodeData.setLastHeartbeatRecv(now); return Status::OK(); } Status TopologyCoordinator::prepareHeartbeatResponseV1(Date_t now, const ReplSetHeartbeatArgsV1& args, const std::string& ourSetName, ReplSetHeartbeatResponse* response) { // Verify that replica set names match const std::string rshb = args.getSetName(); if (ourSetName != rshb) { log() << "replSet set names do not match, ours: " << ourSetName << "; remote node's: " << rshb; return Status(ErrorCodes::InconsistentReplicaSetNames, str::stream() << "Our set name of " << ourSetName << " does not match name " << rshb << " reported by remote node"); } const MemberState myState = getMemberState(); if (_selfIndex == -1) { if (myState.removed()) { return Status(ErrorCodes::InvalidReplicaSetConfig, "Our replica set configuration is invalid or does not include us"); } } else { if (args.getSenderId() == _selfConfig().getId()) { return Status(ErrorCodes::BadValue, str::stream() << "Received heartbeat from member with the same " "member ID as ourself: " << args.getSenderId()); } } response->setSetName(ourSetName); response->setState(myState.s); if (myState.primary()) { response->setElectionTime(_electionTime); } const OpTime lastOpApplied = getMyLastAppliedOpTime(); const OpTime lastOpDurable = getMyLastDurableOpTime(); response->setAppliedOpTime(lastOpApplied); response->setDurableOpTime(lastOpDurable); if (_currentPrimaryIndex != -1) { response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId()); } response->setTerm(_term); if (!_syncSource.empty()) { response->setSyncingTo(_syncSource); } if (!_rsConfig.isInitialized()) { response->setConfigVersion(-2); return Status::OK(); } const long long v = _rsConfig.getConfigVersion(); response->setConfigVersion(v); // Deliver new config if caller's version is older than ours if (v > args.getConfigVersion()) { response->setConfig(_rsConfig); } // Resolve the caller's id in our Member list int from = -1; if (v == args.getConfigVersion() && args.getSenderId() != -1) { from = _getMemberIndex(args.getSenderId()); } if (from == -1) { return Status::OK(); } invariant(from != _selfIndex); auto& fromNodeData = _memberData.at(from); // note that we got a heartbeat from this node fromNodeData.setLastHeartbeatRecv(now); // Update liveness for sending node. fromNodeData.updateLiveness(now); return Status::OK(); } int TopologyCoordinator::_getMemberIndex(int id) const { int index = 0; for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); ++it, ++index) { if (it->getId() == id) { return index; } } return -1; } std::pair TopologyCoordinator::prepareHeartbeatRequest( Date_t now, const std::string& ourSetName, const HostAndPort& target) { PingStats& hbStats = _pings[target]; Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); if (!_rsConfig.isInitialized() || !hbStats.trying() || (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) { // This is either the first request ever for "target", or the heartbeat timeout has // passed, so we're starting a "new" heartbeat. hbStats.start(now); alreadyElapsed = Milliseconds(0); } ReplSetHeartbeatArgs hbArgs; hbArgs.setProtocolVersion(1); hbArgs.setCheckEmpty(false); if (_rsConfig.isInitialized()) { hbArgs.setSetName(_rsConfig.getReplSetName()); hbArgs.setConfigVersion(_rsConfig.getConfigVersion()); if (_selfIndex >= 0) { const MemberConfig& me = _selfConfig(); hbArgs.setSenderHost(me.getHostAndPort()); hbArgs.setSenderId(me.getId()); } } else { hbArgs.setSetName(ourSetName); hbArgs.setConfigVersion(-2); } hbArgs.setHeartbeatVersion(1); const Milliseconds timeoutPeriod( _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis() : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod}); const Milliseconds timeout = timeoutPeriod - alreadyElapsed; return std::make_pair(hbArgs, timeout); } std::pair TopologyCoordinator::prepareHeartbeatRequestV1( Date_t now, const std::string& ourSetName, const HostAndPort& target) { PingStats& hbStats = _pings[target]; Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64()); if ((!_rsConfig.isInitialized()) || !hbStats.trying() || (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) { // This is either the first request ever for "target", or the heartbeat timeout has // passed, so we're starting a "new" heartbeat. hbStats.start(now); alreadyElapsed = Milliseconds(0); } ReplSetHeartbeatArgsV1 hbArgs; if (_rsConfig.isInitialized()) { hbArgs.setSetName(_rsConfig.getReplSetName()); hbArgs.setConfigVersion(_rsConfig.getConfigVersion()); if (_selfIndex >= 0) { const MemberConfig& me = _selfConfig(); hbArgs.setSenderId(me.getId()); hbArgs.setSenderHost(me.getHostAndPort()); } hbArgs.setTerm(_term); } else { hbArgs.setSetName(ourSetName); // Config version -2 is for uninitialized config. hbArgs.setConfigVersion(-2); hbArgs.setTerm(OpTime::kInitialTerm); } hbArgs.setHeartbeatVersion(1); const Milliseconds timeoutPeriod( _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis() : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod}); const Milliseconds timeout(timeoutPeriod - alreadyElapsed); return std::make_pair(hbArgs, timeout); } HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, const StatusWith& hbResponse) { const MemberState originalState = getMemberState(); PingStats& hbStats = _pings[target]; invariant(hbStats.getLastHeartbeatStartDate() != Date_t()); const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) || (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed); if (!hbResponse.isOK()) { if (isUnauthorized) { hbStats.hit(networkRoundTripTime); } else { hbStats.miss(); } } else { hbStats.hit(networkRoundTripTime); // Log diagnostics. if (hbResponse.getValue().isStateDisagreement()) { LOG(1) << target << " thinks that we are down because they cannot send us heartbeats."; } } // If a node is not PRIMARY and has no sync source, we increase the heartbeat rate in order // to help it find a sync source more quickly, which helps ensure the PRIMARY will continue to // see the majority of the cluster. // // Arbiters also decrease their heartbeat interval to at most half the election timeout period. Milliseconds heartbeatInterval = _rsConfig.getHeartbeatInterval(); if (_rsConfig.getProtocolVersion() == 1) { if (getMemberState().arbiter()) { heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2, _rsConfig.getHeartbeatInterval()); } else if (getSyncSourceAddress().empty() && !_iAmPrimary()) { heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2, _rsConfig.getHeartbeatInterval() / 4); } } const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); Date_t nextHeartbeatStartDate; // Determine the next heartbeat start time. If a heartbeat has not succeeded or failed, and we // have not used up the timeout period, we should retry. if (hbStats.trying() && (alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriod())) { // There are still retries left, let's use one. nextHeartbeatStartDate = now; } else { nextHeartbeatStartDate = now + heartbeatInterval; } if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) { const long long currentConfigVersion = _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2; const ReplSetConfig& newConfig = hbResponse.getValue().getConfig(); if (newConfig.getConfigVersion() > currentConfigVersion) { HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction(); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } else { // Could be we got the newer version before we got the response, or the // target erroneously sent us one, even through it isn't newer. if (newConfig.getConfigVersion() < currentConfigVersion) { LOG(1) << "Config version from heartbeat was older than ours."; } else { LOG(2) << "Config from heartbeat response was same as ours."; } if (logger::globalLogDomain()->shouldLog(MongoLogDefaultComponent_component, ::mongo::LogstreamBuilder::severityCast(2))) { LogstreamBuilder lsb = log(); if (_rsConfig.isInitialized()) { lsb << "Current config: " << _rsConfig.toBSON() << "; "; } lsb << "Config in heartbeat: " << newConfig.toBSON(); } } } // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do, // so return early. if (!_rsConfig.isInitialized()) { HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } // If we're not in the config, we don't need to respond to heartbeats. if (_selfIndex == -1) { LOG(1) << "Could not find ourself in current config so ignoring heartbeat from " << target << " -- current config: " << _rsConfig.toBSON(); HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target); if (memberIndex == -1) { LOG(1) << "Could not find " << target << " in current config so ignoring --" " current config: " << _rsConfig.toBSON(); HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } invariant(memberIndex != _selfIndex); MemberData& hbData = _memberData.at(memberIndex); const MemberConfig member = _rsConfig.getMemberAt(memberIndex); bool advancedOpTime = false; if (!hbResponse.isOK()) { if (isUnauthorized) { hbData.setAuthIssue(now); } // If the heartbeat has failed i.e. used up all retries, then we mark the target node as // down. else if (hbStats.failed() || (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriod())) { hbData.setDownValues(now, hbResponse.getStatus().reason()); } else { LOG(3) << "Bad heartbeat response from " << target << "; trying again; Retries left: " << (hbStats.retriesLeft()) << "; " << alreadyElapsed << " have already elapsed"; } } else { ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue()); LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId() << ", msg: " << hbr.getHbMsg(); advancedOpTime = hbData.setUpValues(now, std::move(hbr)); } HeartbeatResponseAction nextAction; if (_rsConfig.getProtocolVersion() == 0) { nextAction = _updatePrimaryFromHBData(memberIndex, originalState, now); } else { nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now); } nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); nextAction.setAdvancedOpTime(advancedOpTime); return nextAction; } bool TopologyCoordinator::haveNumNodesReachedOpTime(const OpTime& targetOpTime, int numNodes, bool durablyWritten) { // Replication progress that is for some reason ahead of us should not allow us to // satisfy a write concern if we aren't caught up ourselves. OpTime myOpTime = durablyWritten ? getMyLastDurableOpTime() : getMyLastAppliedOpTime(); if (myOpTime < targetOpTime) { return false; } for (auto&& memberData : _memberData) { const OpTime& memberOpTime = durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime(); if (memberOpTime >= targetOpTime) { --numNodes; } if (numNodes <= 0) { return true; } } return false; } bool TopologyCoordinator::haveTaggedNodesReachedOpTime(const OpTime& opTime, const ReplSetTagPattern& tagPattern, bool durablyWritten) { ReplSetTagMatch matcher(tagPattern); for (auto&& memberData : _memberData) { const OpTime& memberOpTime = durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime(); if (memberOpTime >= opTime) { // This node has reached the desired optime, now we need to check if it is a part // of the tagPattern. int memberIndex = memberData.getConfigIndex(); invariant(memberIndex >= 0); const MemberConfig& memberConfig = _rsConfig.getMemberAt(memberIndex); for (MemberConfig::TagIterator it = memberConfig.tagsBegin(); it != memberConfig.tagsEnd(); ++it) { if (matcher.update(*it)) { return true; } } } } return false; } HeartbeatResponseAction TopologyCoordinator::checkMemberTimeouts(Date_t now) { bool stepdown = false; for (int memberIndex = 0; memberIndex < static_cast(_memberData.size()); memberIndex++) { auto& memberData = _memberData[memberIndex]; if (!memberData.isSelf() && !memberData.lastUpdateStale() && now - memberData.getLastUpdate() >= _rsConfig.getElectionTimeoutPeriod()) { memberData.markLastUpdateStale(); if (_iAmPrimary()) { stepdown = stepdown || setMemberAsDown(now, memberIndex); } } } if (stepdown) { log() << "can't see a majority of the set, relinquishing primary"; return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); } return HeartbeatResponseAction::makeNoAction(); } std::vector TopologyCoordinator::getHostsWrittenTo(const OpTime& op, bool durablyWritten, bool skipSelf) { std::vector hosts; for (const auto& memberData : _memberData) { if (skipSelf && memberData.isSelf()) { continue; } if (durablyWritten) { if (memberData.getLastDurableOpTime() < op) { continue; } } else if (memberData.getLastAppliedOpTime() < op) { continue; } hosts.push_back(memberData.getHostAndPort()); } return hosts; } bool TopologyCoordinator::setMemberAsDown(Date_t now, const int memberIndex) { invariant(memberIndex != _selfIndex); invariant(memberIndex != -1); invariant(_currentPrimaryIndex == _selfIndex); MemberData& hbData = _memberData.at(memberIndex); hbData.setDownValues(now, "no response within election timeout period"); if (CannotSeeMajority & _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) { return true; } return false; } std::pair TopologyCoordinator::getStalestLiveMember() const { Date_t earliestDate = Date_t::max(); int earliestMemberId = -1; for (const auto& memberData : _memberData) { if (memberData.isSelf()) { continue; } if (memberData.lastUpdateStale()) { // Already stale. continue; } LOG(3) << "memberData lastupdate is: " << memberData.getLastUpdate(); if (earliestDate > memberData.getLastUpdate()) { earliestDate = memberData.getLastUpdate(); earliestMemberId = memberData.getMemberId(); } } LOG(3) << "stalest member " << earliestMemberId << " date: " << earliestDate; return std::make_pair(earliestMemberId, earliestDate); } void TopologyCoordinator::resetAllMemberTimeouts(Date_t now) { for (auto&& memberData : _memberData) memberData.updateLiveness(now); } void TopologyCoordinator::resetMemberTimeouts(Date_t now, const stdx::unordered_set& member_set) { for (auto&& memberData : _memberData) { if (member_set.count(memberData.getHostAndPort())) memberData.updateLiveness(now); } } OpTime TopologyCoordinator::getMyLastAppliedOpTime() const { return _selfMemberData().getLastAppliedOpTime(); } void TopologyCoordinator::setMyLastAppliedOpTime(OpTime opTime, Date_t now, bool isRollbackAllowed) { auto& myMemberData = _selfMemberData(); invariant(isRollbackAllowed || opTime >= myMemberData.getLastAppliedOpTime()); myMemberData.setLastAppliedOpTime(opTime, now); } OpTime TopologyCoordinator::getMyLastDurableOpTime() const { return _selfMemberData().getLastDurableOpTime(); } void TopologyCoordinator::setMyLastDurableOpTime(OpTime opTime, Date_t now, bool isRollbackAllowed) { auto& myMemberData = _selfMemberData(); invariant(isRollbackAllowed || opTime >= myMemberData.getLastDurableOpTime()); myMemberData.setLastDurableOpTime(opTime, now); } StatusWith TopologyCoordinator::setLastOptime(const UpdatePositionArgs::UpdateInfo& args, Date_t now, long long* configVersion) { if (_selfIndex == -1) { // Ignore updates when we're in state REMOVED. return Status(ErrorCodes::NotMasterOrSecondary, "Received replSetUpdatePosition command but we are in state REMOVED"); } invariant(_rsConfig.isInitialized()); // Can only use setLastOptime in replSet mode. if (args.memberId < 0) { std::string errmsg = str::stream() << "Received replSetUpdatePosition for node with memberId " << args.memberId << " which is negative and therefore invalid"; LOG(1) << errmsg; return Status(ErrorCodes::NodeNotFound, errmsg); } if (args.memberId == _rsConfig.getMemberAt(_selfIndex).getId()) { // Do not let remote nodes tell us what our optime is. return false; } LOG(2) << "received notification that node with memberID " << args.memberId << " in config with version " << args.cfgver << " has reached optime: " << args.appliedOpTime << " and is durable through: " << args.durableOpTime; if (args.cfgver != _rsConfig.getConfigVersion()) { std::string errmsg = str::stream() << "Received replSetUpdatePosition for node with memberId " << args.memberId << " whose config version of " << args.cfgver << " doesn't match our config version of " << _rsConfig.getConfigVersion(); LOG(1) << errmsg; *configVersion = _rsConfig.getConfigVersion(); return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg); } auto* memberData = _findMemberDataByMemberId(args.memberId); if (!memberData) { invariant(!_rsConfig.findMemberByID(args.memberId)); std::string errmsg = str::stream() << "Received replSetUpdatePosition for node with memberId " << args.memberId << " which doesn't exist in our config"; LOG(1) << errmsg; return Status(ErrorCodes::NodeNotFound, errmsg); } invariant(args.memberId == memberData->getMemberId()); LOG(3) << "Node with memberID " << args.memberId << " currently has optime " << memberData->getLastAppliedOpTime() << " durable through " << memberData->getLastDurableOpTime() << "; updating to optime " << args.appliedOpTime << " and durable through " << args.durableOpTime; bool advancedOpTime = memberData->advanceLastAppliedOpTime(args.appliedOpTime, now); advancedOpTime = memberData->advanceLastDurableOpTime(args.durableOpTime, now) || advancedOpTime; return advancedOpTime; } void TopologyCoordinator::setLastOptimeForSlave(const OID& rid, const OpTime& opTime, Date_t now) { massert(28576, "Received an old style replication progress update, which is only used for Master/" "Slave replication now, but this node is not using Master/Slave replication. " "This is likely caused by an old (pre-2.6) member syncing from this node.", !_rsConfig.isInitialized()); auto* memberData = _findMemberDataByRid(rid); if (memberData) { memberData->advanceLastAppliedOpTime(opTime, now); } else { invariant(!_memberData.empty()); // Must always have our own entry first. _memberData.emplace_back(); memberData = &_memberData.back(); memberData->setRid(rid); memberData->setLastAppliedOpTime(opTime, now); } } void TopologyCoordinator::setMyRid(const OID& rid) { _selfMemberData().setRid(rid); } MemberData* TopologyCoordinator::_findMemberDataByMemberId(const int memberId) { const int memberIndex = _getMemberIndex(memberId); if (memberIndex >= 0) return &_memberData[memberIndex]; return nullptr; } MemberData* TopologyCoordinator::_findMemberDataByRid(const OID rid) { for (auto& memberData : _memberData) { if (memberData.getRid() == rid) return &memberData; } return nullptr; } Status TopologyCoordinator::processHandshake(const OID& rid, const HostAndPort& hostAndPort) { if (_rsConfig.isInitialized()) { return Status(ErrorCodes::IllegalOperation, "The handshake command is only used for master/slave replication"); } auto* memberData = _findMemberDataByRid(rid); if (!memberData) { invariant(!_memberData.empty()); // Must always have our own entry first. _memberData.emplace_back(); auto& newMember = _memberData.back(); newMember.setRid(rid); newMember.setHostAndPort(hostAndPort); } return Status::OK(); } HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBDataV1( int updatedConfigIndex, const MemberState& originalState, Date_t now) { // // Updates the local notion of which remote node, if any is primary. // Start the priority takeover process if we are eligible. // invariant(updatedConfigIndex != _selfIndex); // If we are missing from the config, do not participate in primary maintenance or election. if (_selfIndex == -1) { return HeartbeatResponseAction::makeNoAction(); } // If we are the primary, there must be no other primary, otherwise its higher term would // have already made us step down. if (_currentPrimaryIndex == _selfIndex) { return HeartbeatResponseAction::makeNoAction(); } // Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex. int primaryIndex = -1; for (size_t i = 0; i < _memberData.size(); i++) { const MemberData& member = _memberData.at(i); if (member.getState().primary() && member.up()) { if (primaryIndex == -1 || _memberData.at(primaryIndex).getTerm() < member.getTerm()) { primaryIndex = i; } } } _currentPrimaryIndex = primaryIndex; if (_currentPrimaryIndex == -1) { return HeartbeatResponseAction::makeNoAction(); } // Clear last heartbeat message on ourselves. setMyHeartbeatMessage(now, ""); // Takeover when the replset is stable. // // Take over the primary only if the remote primary is in the latest term I know. // This is done only when we get a heartbeat response from the primary. // Otherwise, there must be an outstanding election, which may succeed or not, but // the remote primary will become aware of that election eventually and step down. if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) { // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled. bool catchupTakeoverDisabled = ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() || ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay(); bool scheduleCatchupTakeover = false; bool schedulePriorityTakeover = false; if (!catchupTakeoverDisabled && (_memberData.at(primaryIndex).getLastAppliedOpTime() < _memberData.at(_selfIndex).getLastAppliedOpTime())) { LOG(2) << "I can take over the primary due to fresher data." << " Current primary index: " << primaryIndex << " in term " << _memberData.at(primaryIndex).getTerm() << "." << " Current primary optime: " << _memberData.at(primaryIndex).getLastAppliedOpTime() << " My optime: " << _memberData.at(_selfIndex).getLastAppliedOpTime(); scheduleCatchupTakeover = true; } if (_rsConfig.getMemberAt(primaryIndex).getPriority() < _rsConfig.getMemberAt(_selfIndex).getPriority()) { LOG(2) << "I can take over the primary due to higher priority." << " Current primary index: " << primaryIndex << " in term " << _memberData.at(primaryIndex).getTerm(); schedulePriorityTakeover = true; } // Calculate rank of current node. A rank of 0 indicates that it has the highest priority. auto currentNodePriority = _rsConfig.getMemberAt(_selfIndex).getPriority(); // Schedule a priority takeover early only if we know that the current node has the highest // priority in the replica set, has a higher priority than the primary, and is the most // up to date node. // Otherwise, prefer to schedule a catchup takeover over a priority takeover if (scheduleCatchupTakeover && schedulePriorityTakeover && _rsConfig.calculatePriorityRank(currentNodePriority) == 0) { LOG(2) << "I can take over the primary because I have a higher priority, the highest " << "priority in the replica set, and fresher data." << " Current primary index: " << primaryIndex << " in term " << _memberData.at(primaryIndex).getTerm(); return HeartbeatResponseAction::makePriorityTakeoverAction(); } if (scheduleCatchupTakeover) { return HeartbeatResponseAction::makeCatchupTakeoverAction(); } if (schedulePriorityTakeover) { return HeartbeatResponseAction::makePriorityTakeoverAction(); } } return HeartbeatResponseAction::makeNoAction(); } HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBData( int updatedConfigIndex, const MemberState& originalState, Date_t now) { // This method has two interrelated responsibilities, performed in two phases. // // First, it updates the local notion of which remote node, if any is primary. In the // process, it may request a remote primary to step down because there is a higher priority // node waiting, or because the local node thinks it is primary and that it has a more // recent electionTime. It may instead decide that the local node should step down itself, // because a remote has a more recent election time. // // Second, if there is no remote primary, and the local node is not primary, it considers // whether or not to stand for election. invariant(updatedConfigIndex != _selfIndex); // We are missing from the config, so do not participate in primary maintenance or election. if (_selfIndex == -1) { return HeartbeatResponseAction::makeNoAction(); } //////////////////// // Phase 1 //////////////////// // If we believe the node whose data was just updated is primary, confirm that // the updated data supports that notion. If not, erase our notion of who is primary. if (updatedConfigIndex == _currentPrimaryIndex) { const MemberData& updatedHBData = _memberData.at(updatedConfigIndex); if (!updatedHBData.up() || !updatedHBData.getState().primary()) { _currentPrimaryIndex = -1; } } // If the current primary is not highest priority and up to date (within 10s), // have them/me stepdown. if (_currentPrimaryIndex != -1) { // check if we should ask the primary (possibly ourselves) to step down const int highestPriorityIndex = _getHighestPriorityElectableIndex(now); if (highestPriorityIndex != -1) { const MemberConfig& currentPrimaryMember = _rsConfig.getMemberAt(_currentPrimaryIndex); const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex); const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex ? getMyLastAppliedOpTime() : _memberData.at(highestPriorityIndex).getHeartbeatAppliedOpTime(); if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) && _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime)) { const OpTime latestOpTime = _latestKnownOpTime(); if (_iAmPrimary()) { if (_leaderMode == LeaderMode::kSteppingDown) { return HeartbeatResponseAction::makeNoAction(); } log() << "Stepping down self (priority " << currentPrimaryMember.getPriority() << ") because " << highestPriorityMember.getHostAndPort() << " has higher priority " << highestPriorityMember.getPriority() << " and is only " << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs()) << " seconds behind me"; const Date_t until = now + VoteLease::leaseTime + _rsConfig.getHeartbeatInterval(); if (_electionSleepUntil < until) { _electionSleepUntil = until; } return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); } else if ((highestPriorityIndex == _selfIndex) && (_electionSleepUntil <= now)) { // If this node is the highest priority node, and it is not in // an inter-election sleep period, ask the current primary to step down. // This is an optimization, because the remote primary will almost certainly // notice this node's electability promptly, via its own heartbeat process. log() << "Requesting that " << currentPrimaryMember.getHostAndPort() << " (priority " << currentPrimaryMember.getPriority() << ") step down because I have higher priority " << highestPriorityMember.getPriority() << " and am only " << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs()) << " seconds behind it"; int primaryIndex = _currentPrimaryIndex; _currentPrimaryIndex = -1; return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex); } } } } // Scan the member list's heartbeat data for who is primary, and update // _currentPrimaryIndex and _role, or request a remote to step down, as necessary. { int remotePrimaryIndex = -1; for (std::vector::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { const int itIndex = indexOfIterator(_memberData, it); if (itIndex == _selfIndex) { continue; } if (it->getState().primary() && it->up()) { if (remotePrimaryIndex != -1) { // two other nodes think they are primary (asynchronously polled) // -- wait for things to settle down. warning() << "two remote primaries (transiently)"; return HeartbeatResponseAction::makeNoAction(); } remotePrimaryIndex = itIndex; } } if (remotePrimaryIndex != -1) { // If it's the same as last time, don't do anything further. if (_currentPrimaryIndex == remotePrimaryIndex) { return HeartbeatResponseAction::makeNoAction(); } // Clear last heartbeat message on ourselves (why?) setMyHeartbeatMessage(now, ""); // If we are also primary, this is a problem. Determine who should step down. if (_iAmPrimary()) { Timestamp remoteElectionTime = _memberData.at(remotePrimaryIndex).getElectionTime(); log() << "another primary seen with election time " << remoteElectionTime << " my election time is " << _electionTime; // Step down whomever has the older election time. if (remoteElectionTime > _electionTime) { if (_leaderMode == LeaderMode::kSteppingDown) { return HeartbeatResponseAction::makeNoAction(); } log() << "stepping down; another primary was elected more recently"; return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); } else { log() << "another PRIMARY detected and it should step down" " since it was elected earlier than me"; return HeartbeatResponseAction::makeStepDownRemoteAction(remotePrimaryIndex); } } _currentPrimaryIndex = remotePrimaryIndex; return HeartbeatResponseAction::makeNoAction(); } } //////////////////// // Phase 2 //////////////////// // We do not believe any remote to be primary. // If we are primary, check if we can still see majority of the set; // stepdown if we can't. if (_iAmPrimary()) { if (CannotSeeMajority & _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) { if (_leaderMode == LeaderMode::kSteppingDown) { return HeartbeatResponseAction::makeNoAction(); } log() << "can't see a majority of the set, relinquishing primary"; return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); } LOG(2) << "Choosing to remain primary"; return HeartbeatResponseAction::makeNoAction(); } fassert(18505, _currentPrimaryIndex == -1); const MemberState currentState = getMemberState(); if (originalState.recovering() && currentState.secondary()) { // We just transitioned from RECOVERING to SECONDARY, this can only happen if we // received a heartbeat with an auth error when previously all the heartbeats we'd // received had auth errors. In this case, don't return makeElectAction() because // that could cause the election to start before the ReplicationCoordinator has updated // its notion of the member state to SECONDARY. Instead return noAction so that the // ReplicationCooridinator knows to update its tracking of the member state off of the // TopologyCoordinator, and leave starting the election until the next heartbeat comes // back. return HeartbeatResponseAction::makeNoAction(); } // At this point, there is no primary anywhere. Check to see if we should become a candidate. const auto status = checkShouldStandForElection(now); if (!status.isOK()) { // NOTE: This log line is checked in unit test(s). LOG(2) << "TopologyCoordinator::_updatePrimaryFromHBData - " << status.reason(); return HeartbeatResponseAction::makeNoAction(); } fassertStatusOK(28816, becomeCandidateIfElectable(now, StartElectionReason::kElectionTimeout)); return HeartbeatResponseAction::makeElectAction(); } Status TopologyCoordinator::checkShouldStandForElection(Date_t now) const { if (_currentPrimaryIndex != -1) { return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"}; } invariant(_role != Role::kLeader); if (_role == Role::kCandidate) { return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"}; } const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout); if (NotCloseEnoughToLatestOptime & unelectableReason) { return {ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election because " << _getUnelectableReasonString(unelectableReason) << "; my last optime is " << getMyLastAppliedOpTime().toString() << " and the newest is " << _latestKnownOpTime().toString()}; } if (unelectableReason) { return {ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election because " << _getUnelectableReasonString(unelectableReason)}; } if (_electionSleepUntil > now) { if (_rsConfig.getProtocolVersion() == 1) { return { ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election before " << dateToISOStringLocal(_electionSleepUntil) << " because I stood up or learned about a new term too recently"}; } else { return {ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election before " << dateToISOStringLocal(_electionSleepUntil) << " because I stood too recently"}; } } // All checks passed. Start election proceedings. return Status::OK(); } bool TopologyCoordinator::_aMajoritySeemsToBeUp() const { int vUp = 0; for (std::vector::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { const int itIndex = indexOfIterator(_memberData, it); if (itIndex == _selfIndex || it->up()) { vUp += _rsConfig.getMemberAt(itIndex).getNumVotes(); } } return vUp * 2 > _rsConfig.getTotalVotingMembers(); } int TopologyCoordinator::_findHealthyPrimaryOfEqualOrGreaterPriority( const int candidateIndex) const { const double candidatePriority = _rsConfig.getMemberAt(candidateIndex).getPriority(); for (auto it = _memberData.begin(); it != _memberData.end(); ++it) { if (!it->up() || it->getState() != MemberState::RS_PRIMARY) { continue; } const int itIndex = indexOfIterator(_memberData, it); const double priority = _rsConfig.getMemberAt(itIndex).getPriority(); if (itIndex != candidateIndex && priority >= candidatePriority) { return itIndex; } } return -1; } bool TopologyCoordinator::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const { const OpTime latestKnownOpTime = _latestKnownOpTime(); // Use addition instead of subtraction to avoid overflow. return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs()); } bool TopologyCoordinator::_amIFreshEnoughForPriorityTakeover() const { const OpTime latestKnownOpTime = _latestKnownOpTime(); // Rules are: // - If the terms don't match, we don't call for priority takeover. // - If our optime and the latest optime happen in different seconds, our optime must be within // at least priorityTakeoverFreshnessWindowSeconds seconds of the latest optime. // - If our optime and the latest optime happen in the same second, our optime must be within // at least 1000 oplog entries of the latest optime (i.e. the increment portion of the timestamp // must be within 1000). This is to handle the case where a primary had its clock set far into // the future, took some writes, then had its clock set back. In that case the timestamp // component of all future oplog entries generated will be the same, until real world time // passes the timestamp component of the last oplog entry. const OpTime ourLastOpApplied = getMyLastAppliedOpTime(); if (ourLastOpApplied.getTerm() != latestKnownOpTime.getTerm()) { return false; } if (ourLastOpApplied.getTimestamp().getSecs() != latestKnownOpTime.getTimestamp().getSecs()) { return ourLastOpApplied.getTimestamp().getSecs() + priorityTakeoverFreshnessWindowSeconds >= latestKnownOpTime.getTimestamp().getSecs(); } else { return ourLastOpApplied.getTimestamp().getInc() + 1000 >= latestKnownOpTime.getTimestamp().getInc(); } } bool TopologyCoordinator::_amIFreshEnoughForCatchupTakeover() const { const OpTime latestKnownOpTime = _latestKnownOpTime(); // Rules are: // - We must have the freshest optime of all the up nodes. // - We must specifically have a fresher optime than the primary (can't be equal). // - The term of our last applied op must be less than the current term. This ensures that no // writes have happened since the most recent election and that the primary is still in // catchup mode. // There is no point to a catchup takeover if we aren't the freshest node because // another node would immediately perform another catchup takeover when we become primary. const OpTime ourLastOpApplied = getMyLastAppliedOpTime(); if (ourLastOpApplied < latestKnownOpTime) { return false; } if (_currentPrimaryIndex == -1) { return false; } // If we aren't ahead of the primary, there is no point to having a catchup takeover. const OpTime primaryLastOpApplied = _memberData[_currentPrimaryIndex].getLastAppliedOpTime(); if (ourLastOpApplied <= primaryLastOpApplied) { return false; } // If the term of our last applied op is less than the current term, the primary didn't write // anything and it is still in catchup mode. return ourLastOpApplied.getTerm() < _term; } bool TopologyCoordinator::_iAmPrimary() const { if (_role == Role::kLeader) { invariant(_currentPrimaryIndex == _selfIndex); invariant(_leaderMode != LeaderMode::kNotLeader); return true; } return false; } OpTime TopologyCoordinator::_latestKnownOpTime() const { OpTime latest = getMyLastAppliedOpTime(); for (std::vector::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { // Ignore self // TODO(russotto): Simplify when heartbeat and spanning tree times are combined. if (it->isSelf()) { continue; } // Ignore down members if (!it->up()) { continue; } // Ignore removed nodes (not in config, so not valid). if (it->getState().removed()) { continue; } OpTime optime = it->getHeartbeatAppliedOpTime(); if (optime > latest) { latest = optime; } } return latest; } bool TopologyCoordinator::_isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const { if (memberOneIndex == -1) return false; if (memberTwoIndex == -1) return true; return _rsConfig.getMemberAt(memberOneIndex).getPriority() > _rsConfig.getMemberAt(memberTwoIndex).getPriority(); } int TopologyCoordinator::_getHighestPriorityElectableIndex(Date_t now) const { int maxIndex = -1; for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) { UnelectableReasonMask reason = currentIndex == _selfIndex ? _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout) : _getUnelectableReason(currentIndex); if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) { maxIndex = currentIndex; } } return maxIndex; } bool TopologyCoordinator::prepareForUnconditionalStepDown() { if (_leaderMode == LeaderMode::kSteppingDown) { // Can only be processing one required stepdown at a time. return false; } // Heartbeat-initiated stepdowns take precedence over stepdown command initiated stepdowns, so // it's safe to transition from kAttemptingStepDown to kSteppingDown. _setLeaderMode(LeaderMode::kSteppingDown); return true; } Status TopologyCoordinator::prepareForStepDownAttempt() { if (_leaderMode == LeaderMode::kSteppingDown || _leaderMode == LeaderMode::kAttemptingStepDown) { return Status{ErrorCodes::ConflictingOperationInProgress, "This node is already in the process of stepping down"}; } if (_leaderMode == LeaderMode::kNotLeader) { return Status{ErrorCodes::NotMaster, "This node is not a primary."}; } _setLeaderMode(LeaderMode::kAttemptingStepDown); return Status::OK(); } void TopologyCoordinator::abortAttemptedStepDownIfNeeded() { if (_leaderMode == TopologyCoordinator::LeaderMode::kAttemptingStepDown) { _setLeaderMode(TopologyCoordinator::LeaderMode::kMaster); } } void TopologyCoordinator::changeMemberState_forTest(const MemberState& newMemberState, const Timestamp& electionTime) { invariant(_selfIndex != -1); if (newMemberState == getMemberState()) return; switch (newMemberState.s) { case MemberState::RS_PRIMARY: _role = Role::kCandidate; processWinElection(OID(), electionTime); invariant(_role == Role::kLeader); break; case MemberState::RS_SECONDARY: case MemberState::RS_ROLLBACK: case MemberState::RS_RECOVERING: case MemberState::RS_STARTUP2: _role = Role::kFollower; _followerMode = newMemberState.s; if (_currentPrimaryIndex == _selfIndex) { _currentPrimaryIndex = -1; _setLeaderMode(LeaderMode::kNotLeader); } break; case MemberState::RS_STARTUP: updateConfig(ReplSetConfig(), -1, Date_t()); break; default: severe() << "Cannot switch to state " << newMemberState; invariant(false); } if (getMemberState() != newMemberState.s) { severe() << "Expected to enter state " << newMemberState << " but am now in " << getMemberState(); invariant(false); } log() << newMemberState; } void TopologyCoordinator::_setCurrentPrimaryForTest(int primaryIndex) { if (primaryIndex == _selfIndex) { changeMemberState_forTest(MemberState::RS_PRIMARY); } else { if (_iAmPrimary()) { changeMemberState_forTest(MemberState::RS_SECONDARY); } if (primaryIndex != -1) { ReplSetHeartbeatResponse hbResponse; hbResponse.setState(MemberState::RS_PRIMARY); hbResponse.setElectionTime(Timestamp()); hbResponse.setAppliedOpTime(_memberData.at(primaryIndex).getHeartbeatAppliedOpTime()); hbResponse.setSyncingTo(HostAndPort()); hbResponse.setHbMsg(""); _memberData.at(primaryIndex) .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(), std::move(hbResponse)); } _currentPrimaryIndex = primaryIndex; } } const MemberConfig* TopologyCoordinator::_currentPrimaryMember() const { if (_currentPrimaryIndex == -1) return NULL; return &(_rsConfig.getMemberAt(_currentPrimaryIndex)); } void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, BSONObjBuilder* response, Status* result) { // output for each member std::vector membersOut; const MemberState myState = getMemberState(); const Date_t now = rsStatusArgs.now; const OpTime lastOpApplied = getMyLastAppliedOpTime(); const OpTime lastOpDurable = getMyLastDurableOpTime(); const BSONObj& initialSyncStatus = rsStatusArgs.initialSyncStatus; if (_selfIndex == -1) { // We're REMOVED or have an invalid config response->append("state", static_cast(myState.s)); response->append("stateStr", myState.toString()); response->append("uptime", rsStatusArgs.selfUptime); appendOpTime(response, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); response->appendDate("optimeDate", Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); if (_maintenanceModeCalls) { response->append("maintenanceMode", _maintenanceModeCalls); } std::string s = _getHbmsg(now); if (!s.empty()) response->append("infoMessage", s); *result = Status(ErrorCodes::InvalidReplicaSetConfig, "Our replica set config is invalid or we are not a member of it"); return; } for (std::vector::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { const int itIndex = indexOfIterator(_memberData, it); if (itIndex == _selfIndex) { // add self BSONObjBuilder bb; bb.append("_id", _selfConfig().getId()); bb.append("name", _selfConfig().getHostAndPort().toString()); bb.append("health", 1.0); bb.append("state", static_cast(myState.s)); bb.append("stateStr", myState.toString()); bb.append("uptime", rsStatusArgs.selfUptime); if (!_selfConfig().isArbiter()) { appendOpTime(&bb, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); bb.appendDate("optimeDate", Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); } if (!_syncSource.empty() && !_iAmPrimary()) { bb.append("syncingTo", _syncSource.toString()); } if (_maintenanceModeCalls) { bb.append("maintenanceMode", _maintenanceModeCalls); } std::string s = _getHbmsg(now); if (!s.empty()) bb.append("infoMessage", s); if (myState.primary()) { bb.append("electionTime", _electionTime); bb.appendDate("electionDate", Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs()))); } bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion()); bb.append("self", true); membersOut.push_back(bb.obj()); } else { // add non-self member const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex); BSONObjBuilder bb; bb.append("_id", itConfig.getId()); bb.append("name", itConfig.getHostAndPort().toString()); double h = it->getHealth(); bb.append("health", h); const MemberState state = it->getState(); bb.append("state", static_cast(state.s)); if (h == 0) { // if we can't connect the state info is from the past // and could be confusing to show bb.append("stateStr", "(not reachable/healthy)"); } else { bb.append("stateStr", it->getState().toString()); } const unsigned int uptime = static_cast(( it->getUpSince() != Date_t() ? durationCount(now - it->getUpSince()) : 0)); bb.append("uptime", uptime); if (!itConfig.isArbiter()) { appendOpTime( &bb, "optime", it->getHeartbeatAppliedOpTime(), _rsConfig.getProtocolVersion()); appendOpTime(&bb, "optimeDurable", it->getHeartbeatDurableOpTime(), _rsConfig.getProtocolVersion()); bb.appendDate("optimeDate", Date_t::fromDurationSinceEpoch( Seconds(it->getHeartbeatAppliedOpTime().getSecs()))); bb.appendDate("optimeDurableDate", Date_t::fromDurationSinceEpoch( Seconds(it->getHeartbeatDurableOpTime().getSecs()))); } bb.appendDate("lastHeartbeat", it->getLastHeartbeat()); bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv()); Milliseconds ping = _getPing(itConfig.getHostAndPort()); bb.append("pingMs", durationCount(ping)); std::string s = it->getLastHeartbeatMsg(); if (!s.empty()) bb.append("lastHeartbeatMessage", s); if (it->hasAuthIssue()) { bb.append("authenticated", false); } const HostAndPort& syncSource = it->getSyncSource(); if (!syncSource.empty() && !state.primary()) { bb.append("syncingTo", syncSource.toString()); } if (state == MemberState::RS_PRIMARY) { bb.append("electionTime", it->getElectionTime()); bb.appendDate( "electionDate", Date_t::fromDurationSinceEpoch(Seconds(it->getElectionTime().getSecs()))); } bb.appendIntOrLL("configVersion", it->getConfigVersion()); membersOut.push_back(bb.obj()); } } // sort members bson sort(membersOut.begin(), membersOut.end(), SimpleBSONObjComparator::kInstance.makeLessThan()); response->append("set", _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : ""); response->append("date", now); response->append("myState", myState.s); response->append("term", _term); // Add sync source info if (!_syncSource.empty() && !myState.primary() && !myState.removed()) { response->append("syncingTo", _syncSource.toString()); } if (_rsConfig.isConfigServer()) { response->append("configsvr", true); } response->append("heartbeatIntervalMillis", durationCount(_rsConfig.getHeartbeatInterval())); // New optimes, to hold them all. BSONObjBuilder optimes; _lastCommittedOpTime.append(&optimes, "lastCommittedOpTime"); if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) { rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime"); } appendOpTime(&optimes, "appliedOpTime", lastOpApplied, _rsConfig.getProtocolVersion()); appendOpTime(&optimes, "durableOpTime", lastOpDurable, _rsConfig.getProtocolVersion()); response->append("optimes", optimes.obj()); if (!initialSyncStatus.isEmpty()) { response->append("initialSyncStatus", initialSyncStatus); } response->append("members", membersOut); *result = Status::OK(); } StatusWith TopologyCoordinator::prepareReplSetUpdatePositionCommand( OpTime currentCommittedSnapshotOpTime) const { BSONObjBuilder cmdBuilder; invariant(_rsConfig.isInitialized()); // Do not send updates if we have been removed from the config. if (_selfIndex == -1) { return Status(ErrorCodes::NodeNotFound, "This node is not in the current replset configuration."); } cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1); // Create an array containing objects each live member connected to us and for ourself. BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes")); for (const auto& memberData : _memberData) { if (memberData.getLastAppliedOpTime().isNull()) { // Don't include info on members we haven't heard from yet. continue; } // Don't include members we think are down. if (!memberData.isSelf() && memberData.lastUpdateStale()) { continue; } BSONObjBuilder entry(arrayBuilder.subobjStart()); memberData.getLastDurableOpTime().append(&entry, UpdatePositionArgs::kDurableOpTimeFieldName); memberData.getLastAppliedOpTime().append(&entry, UpdatePositionArgs::kAppliedOpTimeFieldName); entry.append(UpdatePositionArgs::kMemberIdFieldName, memberData.getMemberId()); entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion()); } arrayBuilder.done(); // Add metadata to command prepareReplSetMetadata(currentCommittedSnapshotOpTime) .writeToMetadata(&cmdBuilder) .transitional_ignore(); return cmdBuilder.obj(); } void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) { BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress")); { for (const auto& memberData : _memberData) { BSONObjBuilder entry(replicationProgress.subobjStart()); entry.append("rid", memberData.getRid()); const auto lastDurableOpTime = memberData.getLastDurableOpTime(); if (_rsConfig.getProtocolVersion() == 1) { BSONObjBuilder opTime(entry.subobjStart("optime")); opTime.append("ts", lastDurableOpTime.getTimestamp()); opTime.append("term", lastDurableOpTime.getTerm()); opTime.done(); } else { entry.append("optime", lastDurableOpTime.getTimestamp()); } entry.append("host", memberData.getHostAndPort().toString()); if (_selfIndex >= 0) { const int memberId = memberData.getMemberId(); invariant(memberId >= 0); entry.append("memberId", memberId); } } } } void TopologyCoordinator::fillIsMasterForReplSet(IsMasterResponse* response) { const MemberState myState = getMemberState(); if (!_rsConfig.isInitialized()) { response->markAsNoConfig(); return; } for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); ++it) { if (it->isHidden() || it->getSlaveDelay() > Seconds{0}) { continue; } if (it->isElectable()) { response->addHost(it->getHostAndPort()); } else if (it->isArbiter()) { response->addArbiter(it->getHostAndPort()); } else { response->addPassive(it->getHostAndPort()); } } response->setReplSetName(_rsConfig.getReplSetName()); if (myState.removed()) { response->markAsNoConfig(); return; } response->setReplSetVersion(_rsConfig.getConfigVersion()); response->setIsMaster(myState.primary()); response->setIsSecondary(myState.secondary()); const MemberConfig* curPrimary = _currentPrimaryMember(); if (curPrimary) { response->setPrimary(curPrimary->getHostAndPort()); } const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); if (selfConfig.isArbiter()) { response->setIsArbiterOnly(true); } else if (selfConfig.getPriority() == 0) { response->setIsPassive(true); } if (selfConfig.getSlaveDelay() > Seconds(0)) { response->setSlaveDelay(selfConfig.getSlaveDelay()); } if (selfConfig.isHidden()) { response->setIsHidden(true); } if (!selfConfig.shouldBuildIndexes()) { response->setShouldBuildIndexes(false); } const ReplSetTagConfig tagConfig = _rsConfig.getTagConfig(); if (selfConfig.hasTags(tagConfig)) { for (MemberConfig::TagIterator tag = selfConfig.tagsBegin(); tag != selfConfig.tagsEnd(); ++tag) { std::string tagKey = tagConfig.getTagKey(*tag); if (tagKey[0] == '$') { // Filter out internal tags continue; } response->addTag(tagKey, tagConfig.getTagValue(*tag)); } } response->setMe(selfConfig.getHostAndPort()); if (_iAmPrimary()) { response->setElectionId(_electionId); } } StatusWith TopologyCoordinator::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) { if (_role != TopologyCoordinator::Role::kFollower) { std::string msg = str::stream() << "cannot freeze node when primary or running for election. state: " << (_role == TopologyCoordinator::Role::kLeader ? "Primary" : "Running-Election"); log() << msg; return Status(ErrorCodes::NotSecondary, msg); } if (secs == 0) { _stepDownUntil = now; log() << "'unfreezing'"; response->append("info", "unfreezing"); if (_isElectableNodeInSingleNodeReplicaSet()) { // If we are a one-node replica set, we're the one member, // we're electable, we're not in maintenance mode, and we are currently in followerMode // SECONDARY, we must transition to candidate now that our stepdown period // is no longer active, in leiu of heartbeats. _role = Role::kCandidate; return PrepareFreezeResponseResult::kElectSelf; } } else { if (secs == 1) response->append("warning", "you really want to freeze for only 1 second?"); _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs)); log() << "'freezing' for " << secs << " seconds"; } return PrepareFreezeResponseResult::kNoAction; } bool TopologyCoordinator::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) { if (_stepDownUntil > now) { return false; } if (_isElectableNodeInSingleNodeReplicaSet()) { // If the new config describes a one-node replica set, we're the one member, // we're electable, we're not in maintenance mode, and we are currently in followerMode // SECONDARY, we must transition to candidate, in leiu of heartbeats. _role = Role::kCandidate; return true; } return false; } void TopologyCoordinator::setElectionSleepUntil(Date_t newTime) { if (_electionSleepUntil < newTime) { _electionSleepUntil = newTime; } } Timestamp TopologyCoordinator::getElectionTime() const { return _electionTime; } OID TopologyCoordinator::getElectionId() const { return _electionId; } int TopologyCoordinator::getCurrentPrimaryIndex() const { return _currentPrimaryIndex; } Date_t TopologyCoordinator::getStepDownTime() const { return _stepDownUntil; } void TopologyCoordinator::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now) { std::vector oldHeartbeats; _memberData.swap(oldHeartbeats); int index = 0; for (ReplSetConfig::MemberIterator it = newConfig.membersBegin(); it != newConfig.membersEnd(); ++it, ++index) { const MemberConfig& newMemberConfig = *it; MemberData newHeartbeatData; for (auto&& oldMemberData : oldHeartbeats) { if ((oldMemberData.getMemberId() == newMemberConfig.getId() && oldMemberData.getHostAndPort() == newMemberConfig.getHostAndPort()) || (index == selfIndex && oldMemberData.isSelf())) { // This member existed in the old config with the same member ID and // HostAndPort, so copy its heartbeat data over. newHeartbeatData = oldMemberData; break; } } newHeartbeatData.setConfigIndex(index); newHeartbeatData.setIsSelf(index == selfIndex); newHeartbeatData.setHostAndPort(newMemberConfig.getHostAndPort()); newHeartbeatData.setMemberId(newMemberConfig.getId()); _memberData.push_back(newHeartbeatData); } if (selfIndex < 0) { // It's necessary to have self member data even if self isn't in the configuration. // We don't need data for the other nodes (which no longer know about us, or soon won't) _memberData.clear(); // We're not in the config, we can't sync any more. _syncSource = HostAndPort(); MemberData newHeartbeatData; for (auto&& oldMemberData : oldHeartbeats) { if (oldMemberData.isSelf()) { newHeartbeatData = oldMemberData; break; } } newHeartbeatData.setConfigIndex(-1); newHeartbeatData.setIsSelf(true); _memberData.push_back(newHeartbeatData); } } // This function installs a new config object and recreates MemberData objects // that reflect the new config. void TopologyCoordinator::updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now) { invariant(_role != Role::kCandidate); invariant(selfIndex < newConfig.getNumMembers()); // Reset term on startup and upgrade/downgrade of protocol version. if (!_rsConfig.isInitialized() || _rsConfig.getProtocolVersion() != newConfig.getProtocolVersion()) { if (newConfig.getProtocolVersion() == 1) { _term = OpTime::kInitialTerm; } else { invariant(newConfig.getProtocolVersion() == 0); _term = OpTime::kUninitializedTerm; } LOG(1) << "Updated term in topology coordinator to " << _term << " due to new config"; } _updateHeartbeatDataForReconfig(newConfig, selfIndex, now); _rsConfig = newConfig; _selfIndex = selfIndex; _forceSyncSourceIndex = -1; if (_role == Role::kLeader) { if (_selfIndex == -1) { log() << "Could not remain primary because no longer a member of the replica set"; } else if (!_selfConfig().isElectable()) { log() << " Could not remain primary because no longer electable"; } else { // Don't stepdown if you don't have to. _currentPrimaryIndex = _selfIndex; return; } _role = Role::kFollower; _setLeaderMode(LeaderMode::kNotLeader); } // By this point we know we are in Role::kFollower _currentPrimaryIndex = -1; // force secondaries to re-detect who the primary is if (_isElectableNodeInSingleNodeReplicaSet()) { // If the new config describes a one-node replica set, we're the one member, // we're electable, we're not in maintenance mode and we are currently in followerMode // SECONDARY, we must transition to candidate, in leiu of heartbeats. _role = Role::kCandidate; } } std::string TopologyCoordinator::_getHbmsg(Date_t now) const { // ignore messages over 2 minutes old if ((now - _hbmsgTime) > Seconds{120}) { return ""; } return _hbmsg; } void TopologyCoordinator::setMyHeartbeatMessage(const Date_t now, const std::string& message) { _hbmsgTime = now; _hbmsg = message; } const MemberConfig& TopologyCoordinator::_selfConfig() const { return _rsConfig.getMemberAt(_selfIndex); } const MemberData& TopologyCoordinator::_selfMemberData() const { return _memberData[_selfMemberDataIndex()]; } MemberData& TopologyCoordinator::_selfMemberData() { return _memberData[_selfMemberDataIndex()]; } const int TopologyCoordinator::_selfMemberDataIndex() const { invariant(!_memberData.empty()); if (_selfIndex >= 0) return _selfIndex; // In master-slave mode, the first entry is for self. If there is no config // or we're not in the config, the first-and-only entry should be for self. return 0; } TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getUnelectableReason( int index) const { invariant(index != _selfIndex); const MemberConfig& memberConfig = _rsConfig.getMemberAt(index); const MemberData& hbData = _memberData.at(index); UnelectableReasonMask result = None; if (memberConfig.isArbiter()) { result |= ArbiterIAm; } if (memberConfig.getPriority() <= 0) { result |= NoPriority; } if (hbData.getState() != MemberState::RS_SECONDARY) { result |= NotSecondary; } if (_rsConfig.getProtocolVersion() == 0 && !_isOpTimeCloseEnoughToLatestToElect(hbData.getHeartbeatAppliedOpTime())) { result |= NotCloseEnoughToLatestOptime; } if (hbData.up() && hbData.isUnelectable()) { result |= RefusesToStand; } invariant(result || memberConfig.isElectable()); return result; } TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getMyUnelectableReason( const Date_t now, StartElectionReason reason) const { UnelectableReasonMask result = None; const OpTime lastApplied = getMyLastAppliedOpTime(); if (lastApplied.isNull()) { result |= NoData; } if (!_aMajoritySeemsToBeUp()) { result |= CannotSeeMajority; } if (_selfIndex == -1) { result |= NotInitialized; return result; } if (_selfConfig().isArbiter()) { result |= ArbiterIAm; } if (_selfConfig().getPriority() <= 0) { result |= NoPriority; } if (_stepDownUntil > now) { result |= StepDownPeriodActive; } // Cannot be electable unless secondary or already primary if (!getMemberState().secondary() && !_iAmPrimary()) { result |= NotSecondary; } if (_rsConfig.getProtocolVersion() == 0) { // Election rules only for protocol version 0. if (_voteLease.whoId != -1 && _voteLease.whoId != _rsConfig.getMemberAt(_selfIndex).getId() && _voteLease.when + VoteLease::leaseTime >= now) { result |= VotedTooRecently; } if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied)) { result |= NotCloseEnoughToLatestOptime; } } else { // Election rules only for protocol version 1. invariant(_rsConfig.getProtocolVersion() == 1); if (reason == StartElectionReason::kPriorityTakeover && !_amIFreshEnoughForPriorityTakeover()) { result |= NotCloseEnoughToLatestForPriorityTakeover; } if (reason == StartElectionReason::kCatchupTakeover && !_amIFreshEnoughForCatchupTakeover()) { result |= NotFreshEnoughForCatchupTakeover; } } return result; } std::string TopologyCoordinator::_getUnelectableReasonString(const UnelectableReasonMask ur) const { invariant(ur); str::stream ss; bool hasWrittenToStream = false; if (ur & NoData) { ss << "node has no applied oplog entries"; hasWrittenToStream = true; } if (ur & VotedTooRecently) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "I recently voted for " << _voteLease.whoHostAndPort.toString(); } if (ur & CannotSeeMajority) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "I cannot see a majority"; } if (ur & ArbiterIAm) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member is an arbiter"; } if (ur & NoPriority) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member has zero priority"; } if (ur & StepDownPeriodActive) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "I am still waiting for stepdown period to end at " << dateToISOStringLocal(_stepDownUntil); } if (ur & NotSecondary) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member is not currently a secondary"; } if (ur & NotCloseEnoughToLatestOptime) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member is more than 10 seconds behind the most up-to-date member"; } if (ur & NotCloseEnoughToLatestForPriorityTakeover) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member is not caught up enough to the most up-to-date member to call for priority " "takeover - must be within " << priorityTakeoverFreshnessWindowSeconds << " seconds"; } if (ur & NotFreshEnoughForCatchupTakeover) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "member is either not the most up-to-date member or not ahead of the primary, and " "therefore cannot call for catchup takeover"; } if (ur & NotInitialized) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "node is not a member of a valid replica set configuration"; } if (ur & RefusesToStand) { if (hasWrittenToStream) { ss << "; "; } hasWrittenToStream = true; ss << "most recent heartbeat indicates node will not stand for election"; } if (!hasWrittenToStream) { severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur); fassertFailed(26011); } ss << " (mask 0x" << integerToHex(ur) << ")"; return ss; } Milliseconds TopologyCoordinator::_getPing(const HostAndPort& host) { return _pings[host].getMillis(); } void TopologyCoordinator::_setElectionTime(const Timestamp& newElectionTime) { _electionTime = newElectionTime; } int TopologyCoordinator::_getTotalPings() { PingMap::iterator it = _pings.begin(); PingMap::iterator end = _pings.end(); int totalPings = 0; while (it != end) { totalPings += it->second.getCount(); it++; } return totalPings; } std::vector TopologyCoordinator::getMaybeUpHostAndPorts() const { std::vector upHosts; for (std::vector::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { const int itIndex = indexOfIterator(_memberData, it); if (itIndex == _selfIndex) { continue; // skip ourselves } if (!it->maybeUp()) { continue; // skip DOWN nodes } upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort()); } return upHosts; } bool TopologyCoordinator::voteForMyself(Date_t now) { if (_role != Role::kCandidate) { return false; } int selfId = _selfConfig().getId(); if ((_voteLease.when + VoteLease::leaseTime >= now) && (_voteLease.whoId != selfId)) { log() << "not voting yea for " << selfId << " voted for " << _voteLease.whoHostAndPort.toString() << ' ' << durationCount(now - _voteLease.when) << " secs ago"; return false; } _voteLease.when = now; _voteLease.whoId = selfId; _voteLease.whoHostAndPort = _selfConfig().getHostAndPort(); return true; } bool TopologyCoordinator::isSteppingDown() const { return _leaderMode == LeaderMode::kAttemptingStepDown || _leaderMode == LeaderMode::kSteppingDown; } void TopologyCoordinator::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) { // Invariants for valid state transitions. switch (_leaderMode) { case LeaderMode::kNotLeader: invariant(newMode == LeaderMode::kLeaderElect); break; case LeaderMode::kLeaderElect: invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case newMode == LeaderMode::kMaster || newMode == LeaderMode::kAttemptingStepDown || newMode == LeaderMode::kSteppingDown); break; case LeaderMode::kMaster: invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case newMode == LeaderMode::kAttemptingStepDown || newMode == LeaderMode::kSteppingDown); break; case LeaderMode::kAttemptingStepDown: invariant(newMode == LeaderMode::kNotLeader || newMode == LeaderMode::kMaster || newMode == LeaderMode::kSteppingDown); break; case LeaderMode::kSteppingDown: invariant(newMode == LeaderMode::kNotLeader); break; } _leaderMode = std::move(newMode); } MemberState TopologyCoordinator::getMemberState() const { if (_selfIndex == -1) { if (_rsConfig.isInitialized()) { return MemberState::RS_REMOVED; } return MemberState::RS_STARTUP; } if (_rsConfig.isConfigServer()) { if (_options.clusterRole != ClusterRole::ConfigServer && !skipShardingConfigurationChecks) { return MemberState::RS_REMOVED; } else { invariant(_storageEngineSupportsReadCommitted != ReadCommittedSupport::kUnknown); if (_storageEngineSupportsReadCommitted == ReadCommittedSupport::kNo) { return MemberState::RS_REMOVED; } } } else { if (_options.clusterRole == ClusterRole::ConfigServer && !skipShardingConfigurationChecks) { return MemberState::RS_REMOVED; } } if (_role == Role::kLeader) { invariant(_currentPrimaryIndex == _selfIndex); invariant(_leaderMode != LeaderMode::kNotLeader); return MemberState::RS_PRIMARY; } const MemberConfig& myConfig = _selfConfig(); if (myConfig.isArbiter()) { return MemberState::RS_ARBITER; } if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_memberData, _selfIndex))) && (_followerMode == MemberState::RS_SECONDARY)) { return MemberState::RS_RECOVERING; } return _followerMode; } bool TopologyCoordinator::canAcceptWrites() const { return _leaderMode == LeaderMode::kMaster; } void TopologyCoordinator::setElectionInfo(OID electionId, Timestamp electionOpTime) { invariant(_role == Role::kLeader); _electionTime = electionOpTime; _electionId = electionId; } void TopologyCoordinator::processWinElection(OID electionId, Timestamp electionOpTime) { invariant(_role == Role::kCandidate); invariant(_leaderMode == LeaderMode::kNotLeader); _role = Role::kLeader; _setLeaderMode(LeaderMode::kLeaderElect); setElectionInfo(electionId, electionOpTime); _currentPrimaryIndex = _selfIndex; _syncSource = HostAndPort(); _forceSyncSourceIndex = -1; // Prevent last committed optime from updating until we finish draining. _firstOpTimeOfMyTerm = OpTime(Timestamp(std::numeric_limits::max(), 0), std::numeric_limits::max()); } void TopologyCoordinator::processLoseElection() { invariant(_role == Role::kCandidate); invariant(_leaderMode == LeaderMode::kNotLeader); const HostAndPort syncSourceAddress = getSyncSourceAddress(); _electionTime = Timestamp(0, 0); _electionId = OID(); _role = Role::kFollower; // Clear voteLease time, if we voted for ourselves in this election. // This will allow us to vote for others. if (_voteLease.whoId == _selfConfig().getId()) { _voteLease.when = Date_t(); } } bool TopologyCoordinator::attemptStepDown( long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) { if (_role != Role::kLeader || _leaderMode == LeaderMode::kSteppingDown || _term != termAtStart) { uasserted(ErrorCodes::PrimarySteppedDown, "While waiting for secondaries to catch up before stepping down, " "this node decided to step down for other reasons"); } invariant(_leaderMode == LeaderMode::kAttemptingStepDown); if (now >= stepDownUntil) { uasserted(ErrorCodes::ExceededTimeLimit, "By the time we were ready to step down, we were already past the " "time we were supposed to step down until"); } if (!_canCompleteStepDownAttempt(now, waitUntil, force)) { // Stepdown attempt failed. // Check waitUntil after at least one stepdown attempt, so that stepdown could succeed even // if secondaryCatchUpPeriodSecs == 0. if (now >= waitUntil) { uasserted(ErrorCodes::ExceededTimeLimit, str::stream() << "No electable secondaries caught up as of " << dateToISOStringLocal(now) << "Please use the replSetStepDown command with the argument " << "{force: true} to force node to step down."); } // Stepdown attempt failed, but in a way that can be retried return false; } // Stepdown attempt success! _stepDownUntil = stepDownUntil; _stepDownSelfAndReplaceWith(-1); return true; } bool TopologyCoordinator::_canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force) { const bool forceNow = force && (now >= waitUntil); if (forceNow) { return true; } return isSafeToStepDown(); } bool TopologyCoordinator::isSafeToStepDown() { if (!_rsConfig.isInitialized() || _selfIndex < 0) { return false; } OpTime lastApplied = getMyLastAppliedOpTime(); auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName); invariant(tagStatus.isOK()); // Check if a majority of nodes have reached the last applied optime. if (!haveTaggedNodesReachedOpTime(lastApplied, tagStatus.getValue(), false)) { return false; } // Now check that we also have at least one caught up node that is electable. const OpTime lastOpApplied = getMyLastAppliedOpTime(); for (int memberIndex = 0; memberIndex < _rsConfig.getNumMembers(); memberIndex++) { // ignore your self if (memberIndex == _selfIndex) { continue; } UnelectableReasonMask reason = _getUnelectableReason(memberIndex); if (!reason && _memberData.at(memberIndex).getHeartbeatAppliedOpTime() >= lastOpApplied) { // Found a caught up and electable node, succeed with step down. return true; } } return false; } void TopologyCoordinator::setFollowerMode(MemberState::MS newMode) { invariant(_role == Role::kFollower); switch (newMode) { case MemberState::RS_RECOVERING: case MemberState::RS_ROLLBACK: case MemberState::RS_SECONDARY: case MemberState::RS_STARTUP2: _followerMode = newMode; break; default: invariant(false); } if (_followerMode != MemberState::RS_SECONDARY) { return; } // When a single node replica set transitions to SECONDARY, we must check if we should // be a candidate here. This is necessary because a single node replica set has no // heartbeats that would normally change the role to candidate. if (_isElectableNodeInSingleNodeReplicaSet()) { _role = Role::kCandidate; } } bool TopologyCoordinator::_isElectableNodeInSingleNodeReplicaSet() const { return _followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 && _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable() && _maintenanceModeCalls == 0; } void TopologyCoordinator::finishUnconditionalStepDown() { invariant(_leaderMode == LeaderMode::kSteppingDown); int remotePrimaryIndex = -1; for (std::vector::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { const int itIndex = indexOfIterator(_memberData, it); if (itIndex == _selfIndex) { continue; } if (it->getState().primary() && it->up()) { if (remotePrimaryIndex != -1) { // two other nodes think they are primary (asynchronously polled) // -- wait for things to settle down. remotePrimaryIndex = -1; warning() << "two remote primaries (transiently)"; break; } remotePrimaryIndex = itIndex; } } _stepDownSelfAndReplaceWith(remotePrimaryIndex); } void TopologyCoordinator::_stepDownSelfAndReplaceWith(int newPrimary) { invariant(_role == Role::kLeader); invariant(_selfIndex != -1); invariant(_selfIndex != newPrimary); invariant(_selfIndex == _currentPrimaryIndex); _currentPrimaryIndex = newPrimary; _role = Role::kFollower; _setLeaderMode(LeaderMode::kNotLeader); } bool TopologyCoordinator::updateLastCommittedOpTime() { // If we're not primary or we're stepping down due to learning of a new term then we must not // advance the commit point. If we are stepping down due to a user request, however, then it // is safe to advance the commit point, and in fact we must since the stepdown request may be // waiting for the commit point to advance enough to be able to safely complete the step down. if (!_iAmPrimary() || _leaderMode == LeaderMode::kSteppingDown) { return false; } // Whether we use the applied or durable OpTime for the commit point is decided here. const bool useDurableOpTime = _rsConfig.getWriteConcernMajorityShouldJournal(); std::vector votingNodesOpTimes; for (const auto& memberData : _memberData) { int memberIndex = memberData.getConfigIndex(); invariant(memberIndex >= 0); const auto& memberConfig = _rsConfig.getMemberAt(memberIndex); if (memberConfig.isVoter()) { const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime(); votingNodesOpTimes.push_back(opTime); } } invariant(votingNodesOpTimes.size() > 0); if (votingNodesOpTimes.size() < static_cast(_rsConfig.getWriteMajority())) { return false; } std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end()); // need the majority to have this OpTime OpTime committedOpTime = votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()]; return advanceLastCommittedOpTime(committedOpTime); } bool TopologyCoordinator::advanceLastCommittedOpTime(const OpTime& committedOpTime) { if (committedOpTime == _lastCommittedOpTime) { return false; // Hasn't changed, so ignore it. } else if (committedOpTime < _lastCommittedOpTime) { LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime << ", currentCommittedOpTime: " << _lastCommittedOpTime; return false; // This may have come from an out-of-order heartbeat. Ignore it. } // This check is performed to ensure primaries do not commit an OpTime from a previous term. if (_iAmPrimary() && committedOpTime < _firstOpTimeOfMyTerm) { LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: " << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm; return false; } LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime; _lastCommittedOpTime = committedOpTime; return true; } OpTime TopologyCoordinator::getLastCommittedOpTime() const { return _lastCommittedOpTime; } bool TopologyCoordinator::canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const { if (termWhenDrainCompleted != _term) { return false; } // Allow completing the transition to primary even when in the middle of a stepdown attempt, // in case the stepdown attempt fails. if (_leaderMode != LeaderMode::kLeaderElect && _leaderMode != LeaderMode::kAttemptingStepDown) { return false; } return true; } Status TopologyCoordinator::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) { if (!canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())) { return Status(ErrorCodes::PrimarySteppedDown, "By the time this node was ready to complete its transition to PRIMARY it " "was no longer eligible to do so"); } if (_leaderMode == LeaderMode::kLeaderElect) { _setLeaderMode(LeaderMode::kMaster); } _firstOpTimeOfMyTerm = firstOpTimeOfTerm; return Status::OK(); } void TopologyCoordinator::adjustMaintenanceCountBy(int inc) { invariant(_role == Role::kFollower); _maintenanceModeCalls += inc; invariant(_maintenanceModeCalls >= 0); } int TopologyCoordinator::getMaintenanceCount() const { return _maintenanceModeCalls; } TopologyCoordinator::UpdateTermResult TopologyCoordinator::updateTerm(long long term, Date_t now) { if (term <= _term) { return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate; } // Don't run election if we just stood up or learned about a new term. _electionSleepUntil = now + _rsConfig.getElectionTimeoutPeriod(); // Don't update the term just yet if we are going to step down, as we don't want to report // that we are primary in the new term. if (_iAmPrimary()) { return TopologyCoordinator::UpdateTermResult::kTriggerStepDown; } LOG(1) << "Updating term from " << _term << " to " << term; _term = term; return TopologyCoordinator::UpdateTermResult::kUpdatedTerm; } long long TopologyCoordinator::getTerm() const { return _term; } // TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the // replset. Passing metadata is unnecessary. bool TopologyCoordinator::shouldChangeSyncSource( const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, boost::optional oqMetadata, Date_t now) const { // Methodology: // If there exists a viable sync source member other than currentSource, whose oplog has // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's, // return true. // If the currentSource has the same replication progress as we do and has no source for further // progress, return true. if (_selfIndex == -1) { log() << "Not choosing new sync source because we are not in the config."; return false; } // If the user requested a sync source change, return true. if (_forceSyncSourceIndex != -1) { log() << "Choosing new sync source because the user has requested to use " << _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort() << " as a sync source"; return true; } if (_rsConfig.getProtocolVersion() == 1 && replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { log() << "Choosing new sync source because the config version supplied by " << currentSource << ", " << replMetadata.getConfigVersion() << ", does not match ours, " << _rsConfig.getConfigVersion(); return true; } const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource); // PV0 doesn't use metadata, we have to consult _rsConfig. if (currentSourceIndex == -1) { log() << "Choosing new sync source because " << currentSource.toString() << " is not in our config"; return true; } invariant(currentSourceIndex != _selfIndex); // If OplogQueryMetadata was provided, use its values, otherwise use the ones in // ReplSetMetadata. OpTime currentSourceOpTime; int syncSourceIndex = -1; int primaryIndex = -1; if (oqMetadata) { currentSourceOpTime = std::max(oqMetadata->getLastOpApplied(), _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); syncSourceIndex = oqMetadata->getSyncSourceIndex(); primaryIndex = oqMetadata->getPrimaryIndex(); } else { currentSourceOpTime = std::max(replMetadata.getLastOpVisible(), _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); syncSourceIndex = replMetadata.getSyncSourceIndex(); primaryIndex = replMetadata.getPrimaryIndex(); } if (currentSourceOpTime.isNull()) { // Haven't received a heartbeat from the sync source yet, so can't tell if we should // change. return false; } // Change sync source if they are not ahead of us, and don't have a sync source, // unless they are primary. const OpTime myLastOpTime = getMyLastAppliedOpTime(); if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 && currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) { std::stringstream logMessage; logMessage << "Choosing new sync source because our current sync source, " << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime << ") which is not ahead of ours (" << myLastOpTime << "), it does not have a sync source, and it's not the primary"; if (primaryIndex >= 0) { logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)"; } else { logMessage << " (sync source does not know the primary)"; } log() << logMessage.str(); return true; } if (MONGO_FAIL_POINT(disableMaxSyncSourceLagSecs)) { log() << "disableMaxSyncSourceLagSecs fail point enabled - not checking the most recent " "OpTime, " << currentSourceOpTime.toString() << ", of our current sync source, " << currentSource << ", against the OpTimes of the other nodes in this replica set."; } else { unsigned int currentSecs = currentSourceOpTime.getSecs(); unsigned int goalSecs = currentSecs + durationCount(_options.maxSyncSourceLagSecs); for (std::vector::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { const int itIndex = indexOfIterator(_memberData, it); const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex); if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) && (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) && it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) && goalSecs < it->getHeartbeatAppliedOpTime().getSecs()) { log() << "Choosing new sync source because the most recent OpTime of our sync " "source, " << currentSource << ", is " << currentSourceOpTime.toString() << " which is more than " << _options.maxSyncSourceLagSecs << " behind member " << candidateConfig.getHostAndPort().toString() << " whose most recent OpTime is " << it->getHeartbeatAppliedOpTime().toString(); invariant(itIndex != _selfIndex); return true; } } } return false; } rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata( const OpTime& lastVisibleOpTime) const { return rpc::ReplSetMetadata(_term, _lastCommittedOpTime, lastVisibleOpTime, _rsConfig.getConfigVersion(), _rsConfig.getReplicaSetId(), _currentPrimaryIndex, _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); } rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid) const { return rpc::OplogQueryMetadata(_lastCommittedOpTime, getMyLastAppliedOpTime(), rbid, _currentPrimaryIndex, _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); } void TopologyCoordinator::summarizeAsHtml(ReplSetHtmlSummary* output) { // TODO(dannenberg) consider putting both optimes into the htmlsummary. output->setSelfOptime(getMyLastAppliedOpTime()); output->setConfig(_rsConfig); output->setHBData(_memberData); output->setSelfIndex(_selfIndex); output->setPrimaryIndex(_currentPrimaryIndex); output->setSelfState(getMemberState()); output->setSelfHeartbeatMessage(_hbmsg); } void TopologyCoordinator::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response) { response->setTerm(_term); if (args.getTerm() < _term) { response->setVoteGranted(false); response->setReason(str::stream() << "candidate's term (" << args.getTerm() << ") is lower than mine (" << _term << ")"); } else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) { response->setVoteGranted(false); response->setReason(str::stream() << "candidate's config version (" << args.getConfigVersion() << ") differs from mine (" << _rsConfig.getConfigVersion() << ")"); } else if (args.getSetName() != _rsConfig.getReplSetName()) { response->setVoteGranted(false); response->setReason(str::stream() << "candidate's set name (" << args.getSetName() << ") differs from mine (" << _rsConfig.getReplSetName() << ")"); } else if (args.getLastDurableOpTime() < getMyLastAppliedOpTime()) { response->setVoteGranted(false); response ->setReason(str::stream() << "candidate's data is staler than mine. candidate's last applied OpTime: " << args.getLastDurableOpTime().toString() << ", my last applied OpTime: " << getMyLastAppliedOpTime().toString()); } else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) { response->setVoteGranted(false); response->setReason(str::stream() << "already voted for another candidate (" << _rsConfig.getMemberAt(_lastVote.getCandidateIndex()).getHostAndPort() << ") this term (" << _lastVote.getTerm() << ")"); } else { int betterPrimary = _findHealthyPrimaryOfEqualOrGreaterPriority(args.getCandidateIndex()); if (_selfConfig().isArbiter() && betterPrimary >= 0) { response->setVoteGranted(false); response->setReason(str::stream() << "can see a healthy primary (" << _rsConfig.getMemberAt(betterPrimary).getHostAndPort() << ") of equal or greater priority"); } else { if (!args.isADryRun()) { _lastVote.setTerm(args.getTerm()); _lastVote.setCandidateIndex(args.getCandidateIndex()); } response->setVoteGranted(true); } } } void TopologyCoordinator::loadLastVote(const LastVote& lastVote) { _lastVote = lastVote; } void TopologyCoordinator::voteForMyselfV1() { _lastVote.setTerm(_term); _lastVote.setCandidateIndex(_selfIndex); } void TopologyCoordinator::setPrimaryIndex(long long primaryIndex) { _currentPrimaryIndex = primaryIndex; } Status TopologyCoordinator::becomeCandidateIfElectable(const Date_t now, StartElectionReason reason) { if (_role == Role::kLeader) { return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"}; } if (_role == Role::kCandidate) { return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"}; } const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, reason); if (unelectableReason) { return {ErrorCodes::NodeNotElectable, str::stream() << "Not standing for election because " << _getUnelectableReasonString(unelectableReason)}; } // All checks passed, become a candidate and start election proceedings. _role = Role::kCandidate; return Status::OK(); } void TopologyCoordinator::setStorageEngineSupportsReadCommitted(bool supported) { _storageEngineSupportsReadCommitted = supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; } void TopologyCoordinator::restartHeartbeats() { for (auto& hb : _memberData) { hb.restart(); } } boost::optional TopologyCoordinator::latestKnownOpTimeSinceHeartbeatRestart() const { // The smallest OpTime in PV1. OpTime latest(Timestamp(0, 0), 0); for (size_t i = 0; i < _memberData.size(); i++) { auto& peer = _memberData[i]; if (static_cast(i) == _selfIndex) { continue; } // If any heartbeat is not fresh enough, return none. if (!peer.isUpdatedSinceRestart()) { return boost::none; } // Ignore down members if (!peer.up()) { continue; } if (peer.getHeartbeatAppliedOpTime() > latest) { latest = peer.getHeartbeatAppliedOpTime(); } } return latest; } } // namespace repl } // namespace mongo