diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.cpp | 3216 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 977 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 3239 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 529 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_test.cpp (renamed from src/mongo/db/repl/topology_coordinator_impl_test.cpp) | 0 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_v1_test.cpp (renamed from src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp) | 0 |
6 files changed, 3557 insertions, 4404 deletions
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index e79bcad827f..84f4ccbedfe 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -30,53 +30,3209 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/topology_coordinator.h" +#include "mongo/db/repl/topology_coordinator_impl.h" -#include <string> +#include <limits> -#include "mongo/util/assert_util.h" +#include "mongo/db/audit.h" +#include "mongo/db/client.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/heartbeat_response_action.h" +#include "mongo/db/repl/is_master_response.h" +#include "mongo/db/repl/isself.h" +#include "mongo/db/repl/repl_set_heartbeat_args.h" +#include "mongo/db/repl/repl_set_heartbeat_args_v1.h" +#include "mongo/db/repl/repl_set_heartbeat_response.h" +#include "mongo/db/repl/repl_set_html_summary.h" +#include "mongo/db/repl/repl_set_request_votes_args.h" +#include "mongo/db/repl/rslog.h" +#include "mongo/db/repl/update_position_args.h" +#include "mongo/db/server_parameters.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/hex.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace repl { +using std::vector; +const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30); + +// Controls how caught up in replication a secondary with higher priority than the current primary +// must be before it will call for a priority takeover election. +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(priorityTakeoverFreshnessWindowSeconds, int, 2); + +// If this fail point is enabled, TopologyCoordinatorImpl::shouldChangeSyncSource() will ignore +// the option TopologyCoordinatorImpl::Options::maxSyncSourceLagSecs. The sync source will not be +// re-evaluated if it lags behind another node by more than 'maxSyncSourceLagSecs' seconds. +MONGO_FP_DECLARE(disableMaxSyncSourceLagSecs); + namespace { -static const int kLeaderValue = 0; -static const int kFollowerValue = 1; -static const int kCandidateValue = 2; + +template <typename T> +int indexOfIterator(const std::vector<T>& vec, typename std::vector<T>::const_iterator& it) { + return static_cast<int>(it - vec.begin()); +} + +// Maximum number of retries for a failed heartbeat. +const int kMaxHeartbeatRetries = 2; + +/** + * Returns true if the only up heartbeats are auth errors. + */ +bool _hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberData>& hbdata, const int selfIndex) { + bool foundAuthError = false; + for (std::vector<MemberData>::const_iterator it = hbdata.begin(); it != hbdata.end(); ++it) { + if (indexOfIterator(hbdata, it) == selfIndex) { + continue; + } + + if (it->up()) { + return false; + } + + if (it->hasAuthIssue()) { + foundAuthError = true; + } + } + + return foundAuthError; +} + +void appendOpTime(BSONObjBuilder* bob, + const char* elemName, + const OpTime& opTime, + const long long pv) { + if (pv == 1) { + opTime.append(bob, elemName); + } else { + bob->append(elemName, opTime.getTimestamp()); + } +} } // namespace -const TopologyCoordinator::Role TopologyCoordinator::Role::leader(kLeaderValue); -const TopologyCoordinator::Role TopologyCoordinator::Role::follower(kFollowerValue); -const TopologyCoordinator::Role TopologyCoordinator::Role::candidate(kCandidateValue); +void PingStats::start(Date_t now) { + _lastHeartbeatStartDate = now; + _numFailuresSinceLastStart = 0; +} + +void PingStats::hit(Milliseconds millis) { + _numFailuresSinceLastStart = std::numeric_limits<int>::max(); + ++count; + + value = value == UninitializedPing ? millis : Milliseconds((value * 4 + millis) / 5); +} + +void PingStats::miss() { + ++_numFailuresSinceLastStart; +} + +TopologyCoordinatorImpl::TopologyCoordinatorImpl(Options options) + : _role(Role::follower), + _term(OpTime::kUninitializedTerm), + _currentPrimaryIndex(-1), + _forceSyncSourceIndex(-1), + _options(std::move(options)), + _selfIndex(-1), + _maintenanceModeCalls(0), + _followerMode(MemberState::RS_STARTUP2) { + invariant(getMemberState() == MemberState::RS_STARTUP); + // Need an entry for self in the memberHearbeatData. + _memberData.emplace_back(); + _memberData.back().setIsSelf(true); +} + +TopologyCoordinator::Role TopologyCoordinatorImpl::getRole() const { + return _role; +} + +void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) { + invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); + _forceSyncSourceIndex = index; +} + +HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const { + return _syncSource; +} + +HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, + const OpTime& lastOpTimeFetched, + ChainingPreference chainingPreference) { + // If we are not a member of the current replica set configuration, no sync source is valid. + if (_selfIndex == -1) { + LOG(1) << "Cannot sync from any members because we are not in the replica set config"; + return HostAndPort(); + } + + // if we have a target we've requested to sync from, use it + if (_forceSyncSourceIndex != -1) { + invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); + _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort(); + _forceSyncSourceIndex = -1; + log() << "choosing sync source candidate by request: " << _syncSource; + std::string msg(str::stream() << "syncing from: " << _syncSource.toString() + << " by request"); + setMyHeartbeatMessage(now, msg); + return _syncSource; + } + + // wait for 2N pings (not counting ourselves) before choosing a sync target + int needMorePings = (_memberData.size() - 1) * 2 - _getTotalPings(); + + if (needMorePings > 0) { + OCCASIONALLY log() << "waiting for " << needMorePings + << " pings from other members before syncing"; + _syncSource = HostAndPort(); + return _syncSource; + } + + // If we are only allowed to sync from the primary, set that + if (chainingPreference == ChainingPreference::kUseConfiguration && + !_rsConfig.isChainingAllowed()) { + if (_currentPrimaryIndex == -1) { + LOG(1) << "Cannot select a sync source because chaining is" + " not allowed and primary is unknown/down"; + _syncSource = HostAndPort(); + return _syncSource; + } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) { + LOG(1) << "Cannot select a sync source because chaining is not allowed and primary " + "member is blacklisted: " + << _currentPrimaryMember()->getHostAndPort(); + _syncSource = HostAndPort(); + return _syncSource; + } else if (_currentPrimaryIndex == _selfIndex) { + LOG(1) + << "Cannot select a sync source because chaining is not allowed and we are primary"; + _syncSource = HostAndPort(); + return _syncSource; + } else { + _syncSource = _currentPrimaryMember()->getHostAndPort(); + log() << "chaining not allowed, choosing primary as sync source candidate: " + << _syncSource; + std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString()); + setMyHeartbeatMessage(now, msg); + return _syncSource; + } + } + + // find the member with the lowest ping time that is ahead of me + + // choose a time that will exclude no candidates by default, in case we don't see a primary + OpTime oldestSyncOpTime; + + // Find primary's oplog time. Reject sync candidates that are more than + // _options.maxSyncSourceLagSecs seconds behind. + if (_currentPrimaryIndex != -1) { + OpTime primaryOpTime = _memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime(); + + // Check if primaryOpTime is still close to 0 because we haven't received + // our first heartbeat from a new primary yet. + unsigned int maxLag = + static_cast<unsigned int>(durationCount<Seconds>(_options.maxSyncSourceLagSecs)); + if (primaryOpTime.getSecs() >= maxLag) { + oldestSyncOpTime = + OpTime(Timestamp(primaryOpTime.getSecs() - maxLag, 0), primaryOpTime.getTerm()); + } + } + + int closestIndex = -1; + + // Make two attempts, with less restrictive rules the second time. + // + // During the first attempt, we ignore those nodes that have a larger slave + // delay, hidden nodes or non-voting, and nodes that are excessively behind. + // + // For the second attempt include those nodes, in case those are the only ones we can reach. + // + // This loop attempts to set 'closestIndex', to select a viable candidate. + for (int attempts = 0; attempts < 2; ++attempts) { + for (std::vector<MemberData>::const_iterator it = _memberData.begin(); + it != _memberData.end(); + ++it) { + const int itIndex = indexOfIterator(_memberData, it); + // Don't consider ourselves. + if (itIndex == _selfIndex) { + continue; + } + + const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex)); + + // Candidate must be up to be considered. + if (!it->up()) { + LOG(2) << "Cannot select sync source because it is not up: " + << itMemberConfig.getHostAndPort(); + continue; + } + // Candidate must be PRIMARY or SECONDARY state to be considered. + if (!it->getState().readable()) { + LOG(2) << "Cannot select sync source because it is not readable: " + << itMemberConfig.getHostAndPort(); + continue; + } + + // On the first attempt, we skip candidates that do not match these criteria. + if (attempts == 0) { + // Candidate must be a voter if we are a voter. + if (_selfConfig().isVoter() && !itMemberConfig.isVoter()) { + LOG(2) << "Cannot select sync source because we are a voter and it is not: " + << itMemberConfig.getHostAndPort(); + continue; + } + // Candidates must not be hidden. + if (itMemberConfig.isHidden()) { + LOG(2) << "Cannot select sync source because it is hidden: " + << itMemberConfig.getHostAndPort(); + continue; + } + // Candidates cannot be excessively behind. + if (it->getHeartbeatAppliedOpTime() < oldestSyncOpTime) { + LOG(2) << "Cannot select sync source because it is too far behind." + << "Latest optime of sync candidate " << itMemberConfig.getHostAndPort() + << ": " << it->getHeartbeatAppliedOpTime() + << ", oldest acceptable optime: " << oldestSyncOpTime; + continue; + } + // Candidate must not have a configured delay larger than ours. + if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay()) { + LOG(2) << "Cannot select sync source with larger slaveDelay than ours: " + << itMemberConfig.getHostAndPort(); + continue; + } + } + // Candidate must build indexes if we build indexes, to be considered. + if (_selfConfig().shouldBuildIndexes()) { + if (!itMemberConfig.shouldBuildIndexes()) { + LOG(2) << "Cannot select sync source with shouldBuildIndex differences: " + << itMemberConfig.getHostAndPort(); + continue; + } + } + // only consider candidates that are ahead of where we are + if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) { + LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. " + << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON() + << ", latest oplog optime of sync candidate " + << itMemberConfig.getHostAndPort() << ": " + << it->getHeartbeatAppliedOpTime().toBSON(); + continue; + } + // Candidate cannot be more latent than anything we've already considered. + if ((closestIndex != -1) && + (_getPing(itMemberConfig.getHostAndPort()) > + _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) { + LOG(2) << "Cannot select sync source with higher latency than the best candidate: " + << itMemberConfig.getHostAndPort(); + + continue; + } + // Candidate cannot be blacklisted. + if (_memberIsBlacklisted(itMemberConfig, now)) { + LOG(1) << "Cannot select sync source which is blacklisted: " + << itMemberConfig.getHostAndPort(); + + continue; + } + // This candidate has passed all tests; set 'closestIndex' + closestIndex = itIndex; + } + if (closestIndex != -1) + break; // no need for second attempt + } + + if (closestIndex == -1) { + // Did not find any members to sync from + std::string msg("could not find member to sync from"); + // Only log when we had a valid sync source before + if (!_syncSource.empty()) { + log() << msg << rsLog; + } + setMyHeartbeatMessage(now, msg); + + _syncSource = HostAndPort(); + return _syncSource; + } + _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort(); + log() << "sync source candidate: " << _syncSource; + std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0); + setMyHeartbeatMessage(now, msg); + return _syncSource; +} + +bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberConfig, + Date_t now) const { + std::map<HostAndPort, Date_t>::const_iterator blacklisted = + _syncSourceBlacklist.find(memberConfig.getHostAndPort()); + if (blacklisted != _syncSourceBlacklist.end()) { + if (blacklisted->second > now) { + return true; + } + } + return false; +} + +void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { + LOG(2) << "blacklisting " << host << " until " << until.toString(); + _syncSourceBlacklist[host] = until; +} + +void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Date_t now) { + std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host); + if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) { + LOG(2) << "unblacklisting " << host; + _syncSourceBlacklist.erase(hostItr); + } +} + +void TopologyCoordinatorImpl::clearSyncSourceBlacklist() { + _syncSourceBlacklist.clear(); +} + +void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target, + BSONObjBuilder* response, + Status* result) { + response->append("syncFromRequested", target.toString()); + + if (_selfIndex == -1) { + *result = Status(ErrorCodes::NotSecondary, "Removed and uninitialized nodes do not sync"); + return; + } + + const MemberConfig& selfConfig = _selfConfig(); + if (selfConfig.isArbiter()) { + *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync"); + return; + } + if (_selfIndex == _currentPrimaryIndex) { + *result = Status(ErrorCodes::NotSecondary, "primaries don't sync"); + return; + } + + ReplSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd(); + int targetIndex = 0; + for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); + ++it) { + if (it->getHostAndPort() == target) { + targetConfig = it; + break; + } + ++targetIndex; + } + if (targetConfig == _rsConfig.membersEnd()) { + *result = Status(ErrorCodes::NodeNotFound, + str::stream() << "Could not find member \"" << target.toString() + << "\" in replica set"); + return; + } + if (targetIndex == _selfIndex) { + *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself"); + return; + } + if (targetConfig->isArbiter()) { + *result = Status(ErrorCodes::InvalidOptions, + str::stream() << "Cannot sync from \"" << target.toString() + << "\" because it is an arbiter"); + return; + } + if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) { + *result = Status(ErrorCodes::InvalidOptions, + str::stream() << "Cannot sync from \"" << target.toString() + << "\" because it does not build indexes"); + return; + } + + if (selfConfig.isVoter() && !targetConfig->isVoter()) { + *result = Status(ErrorCodes::InvalidOptions, + str::stream() << "Cannot sync from \"" << target.toString() + << "\" because it is not a voter"); + return; + } + + const MemberData& hbdata = _memberData.at(targetIndex); + if (hbdata.hasAuthIssue()) { + *result = + Status(ErrorCodes::Unauthorized, + str::stream() << "not authorized to communicate with " << target.toString()); + return; + } + if (hbdata.getHealth() == 0) { + *result = + Status(ErrorCodes::HostUnreachable, + str::stream() << "I cannot reach the requested member: " << target.toString()); + return; + } + const OpTime lastOpApplied = getMyLastAppliedOpTime(); + if (hbdata.getHeartbeatAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) { + warning() << "attempting to sync from " << target << ", but its latest opTime is " + << hbdata.getHeartbeatAppliedOpTime().getSecs() << " and ours is " + << lastOpApplied.getSecs() << " so this may not work"; + response->append("warning", + str::stream() << "requested member \"" << target.toString() + << "\" is more than 10 seconds behind us"); + // not returning bad Status, just warning + } + + HostAndPort prevSyncSource = getSyncSourceAddress(); + if (!prevSyncSource.empty()) { + response->append("prevSyncTarget", prevSyncSource.toString()); + } + + setForceSyncSourceIndex(targetIndex); + *result = Status::OK(); +} + +void TopologyCoordinatorImpl::prepareFreshResponse( + const ReplicationCoordinator::ReplSetFreshArgs& args, + const Date_t now, + BSONObjBuilder* response, + Status* result) { + if (_rsConfig.getProtocolVersion() != 0) { + *result = Status(ErrorCodes::BadValue, + str::stream() << "replset: incompatible replset protocol version: " + << _rsConfig.getProtocolVersion()); + return; + } + + if (_selfIndex == -1) { + *result = Status(ErrorCodes::ReplicaSetNotFound, + "Cannot participate in elections because not initialized"); + return; + } + + if (args.setName != _rsConfig.getReplSetName()) { + *result = + Status(ErrorCodes::ReplicaSetNotFound, + str::stream() << "Wrong repl set name. Expected: " << _rsConfig.getReplSetName() + << ", received: " + << args.setName); + return; + } + + if (args.id == static_cast<unsigned>(_selfConfig().getId())) { + *result = Status(ErrorCodes::BadValue, + str::stream() << "Received replSetFresh command from member with the " + "same member ID as ourself: " + << args.id); + return; + } + + bool weAreFresher = false; + const OpTime lastOpApplied = getMyLastAppliedOpTime(); + if (_rsConfig.getConfigVersion() > args.cfgver) { + log() << "replSet member " << args.who << " is not yet aware its cfg version " + << args.cfgver << " is stale"; + response->append("info", "config version stale"); + weAreFresher = true; + } + // check not only our own optime, but any other member we can reach + else if (OpTime(args.opTime, _term) < _latestKnownOpTime()) { + weAreFresher = true; + } + response->appendDate("opTime", + Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL())); + response->append("fresher", weAreFresher); + + std::string errmsg; + bool doVeto = _shouldVetoMember(args, now, &errmsg); + response->append("veto", doVeto); + if (doVeto) { + response->append("errmsg", errmsg); + } + *result = Status::OK(); +} + +bool TopologyCoordinatorImpl::_shouldVetoMember( + const ReplicationCoordinator::ReplSetFreshArgs& args, + const Date_t& now, + std::string* errmsg) const { + if (_rsConfig.getConfigVersion() < args.cfgver) { + // We are stale; do not veto. + return false; + } + + const unsigned int memberID = args.id; + const int hopefulIndex = _getMemberIndex(memberID); + invariant(hopefulIndex != _selfIndex); + const int highestPriorityIndex = _getHighestPriorityElectableIndex(now); + + if (hopefulIndex == -1) { + *errmsg = str::stream() << "replSet couldn't find member with id " << memberID; + return true; + } + const OpTime lastOpApplied = getMyLastAppliedOpTime(); + if (_iAmPrimary() && + lastOpApplied >= _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime()) { + // hbinfo is not updated for ourself, so if we are primary we have to check the + // primary's last optime separately + *errmsg = str::stream() << "I am already primary, " + << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() + << " can try again once I've stepped down"; + return true; + } + + if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) && + (_memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime() >= + _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime())) { + // other members might be aware of more up-to-date nodes + *errmsg = + str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() + << " is trying to elect itself but " + << _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString() + << " is already primary and more up-to-date"; + return true; + } + + if ((highestPriorityIndex != -1)) { + const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex); + const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex); + + if (priorityMember.getPriority() > hopefulMember.getPriority()) { + *errmsg = str::stream() << hopefulMember.getHostAndPort().toString() + << " has lower priority of " << hopefulMember.getPriority() + << " than " << priorityMember.getHostAndPort().toString() + << " which has a priority of " << priorityMember.getPriority(); + return true; + } + } + + UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex); + reason &= ~RefusesToStand; + if (reason) { + *errmsg = str::stream() << "I don't think " + << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() + << " is electable because the " + << _getUnelectableReasonString(reason); + return true; + } + + return false; +} + +// produce a reply to a received electCmd +void TopologyCoordinatorImpl::prepareElectResponse( + const ReplicationCoordinator::ReplSetElectArgs& args, + const Date_t now, + BSONObjBuilder* response, + Status* result) { + if (_rsConfig.getProtocolVersion() != 0) { + *result = Status(ErrorCodes::BadValue, + str::stream() << "replset: incompatible replset protocol version: " + << _rsConfig.getProtocolVersion()); + return; + } + if (_selfIndex == -1) { + *result = Status(ErrorCodes::ReplicaSetNotFound, + "Cannot participate in election because not initialized"); + return; + } + + const long long myver = _rsConfig.getConfigVersion(); + const int highestPriorityIndex = _getHighestPriorityElectableIndex(now); + + const MemberConfig* primary = _currentPrimaryMember(); + const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid); + const MemberConfig* highestPriority = + highestPriorityIndex == -1 ? NULL : &_rsConfig.getMemberAt(highestPriorityIndex); + + int vote = 0; + if (args.set != _rsConfig.getReplSetName()) { + log() << "replSet error received an elect request for '" << args.set + << "' but our set name is '" << _rsConfig.getReplSetName() << "'"; + } else if (myver < args.cfgver) { + // we are stale. don't vote + log() << "replSetElect not voting because our config version is stale. Our version: " + << myver << ", their version: " << args.cfgver; + } else if (myver > args.cfgver) { + // they are stale! + log() << "replSetElect command received stale config version # during election. " + "Our version: " + << myver << ", their version: " << args.cfgver; + vote = -10000; + } else if (!hopeful) { + log() << "replSetElect couldn't find member with id " << args.whoid; + vote = -10000; + } else if (_iAmPrimary()) { + log() << "I am already primary, " << hopeful->getHostAndPort().toString() + << " can try again once I've stepped down"; + vote = -10000; + } else if (primary) { + log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but " + << primary->getHostAndPort().toString() << " is already primary"; + vote = -10000; + } else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) { + // TODO(spencer): What if the lower-priority member is more up-to-date? + log() << hopeful->getHostAndPort().toString() << " has lower priority than " + << highestPriority->getHostAndPort().toString(); + vote = -10000; + } else if (_voteLease.when + VoteLease::leaseTime >= now && _voteLease.whoId != args.whoid) { + log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for " + << _voteLease.whoHostAndPort.toString() << ' ' + << durationCount<Seconds>(now - _voteLease.when) << " secs ago"; + } else { + _voteLease.when = now; + _voteLease.whoId = args.whoid; + _voteLease.whoHostAndPort = hopeful->getHostAndPort(); + vote = _selfConfig().getNumVotes(); + invariant(hopeful->getId() == args.whoid); + if (vote > 0) { + log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " (" + << args.whoid << ')'; + } + } + + response->append("vote", vote); + response->append("round", args.round); + *result = Status::OK(); +} + +// produce a reply to a heartbeat +Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now, + const ReplSetHeartbeatArgs& args, + const std::string& ourSetName, + ReplSetHeartbeatResponse* response) { + if (args.getProtocolVersion() != 1) { + return Status(ErrorCodes::BadValue, + str::stream() << "replset: incompatible replset protocol version: " + << args.getProtocolVersion()); + } + + // Verify that replica set names match + const std::string rshb = args.getSetName(); + if (ourSetName != rshb) { + log() << "replSet set names do not match, ours: " << ourSetName + << "; remote node's: " << rshb; + response->noteMismatched(); + return Status(ErrorCodes::InconsistentReplicaSetNames, + str::stream() << "Our set name of " << ourSetName << " does not match name " + << rshb + << " reported by remote node"); + } + + const MemberState myState = getMemberState(); + if (_selfIndex == -1) { + if (myState.removed()) { + return Status(ErrorCodes::InvalidReplicaSetConfig, + "Our replica set configuration is invalid or does not include us"); + } + } else { + invariant(_rsConfig.getReplSetName() == args.getSetName()); + if (args.getSenderId() == _selfConfig().getId()) { + return Status(ErrorCodes::BadValue, + str::stream() << "Received heartbeat from member with the same " + "member ID as ourself: " + << args.getSenderId()); + } + } + + // This is a replica set + response->noteReplSet(); + + response->setSetName(ourSetName); + response->setState(myState.s); + if (myState.primary()) { + response->setElectionTime(_electionTime); + } + + const OpTime lastOpApplied = getMyLastAppliedOpTime(); + const OpTime lastOpDurable = getMyLastDurableOpTime(); + + // Are we electable + response->setElectable(!_getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)); + + // Heartbeat status message + response->setHbMsg(_getHbmsg(now)); + response->setTime(duration_cast<Seconds>(now - Date_t{})); + response->setAppliedOpTime(lastOpApplied); + response->setDurableOpTime(lastOpDurable); + + if (!_syncSource.empty()) { + response->setSyncingTo(_syncSource); + } + + if (!_rsConfig.isInitialized()) { + response->setConfigVersion(-2); + return Status::OK(); + } + + const long long v = _rsConfig.getConfigVersion(); + response->setConfigVersion(v); + // Deliver new config if caller's version is older than ours + if (v > args.getConfigVersion()) { + response->setConfig(_rsConfig); + } + + // Resolve the caller's id in our Member list + int from = -1; + if (v == args.getConfigVersion() && args.getSenderId() != -1) { + from = _getMemberIndex(args.getSenderId()); + } + if (from == -1) { + // Can't find the member, so we leave out the stateDisagreement field + return Status::OK(); + } + invariant(from != _selfIndex); + + // if we thought that this node is down, let it know + if (!_memberData.at(from).up()) { + response->noteStateDisagreement(); + } + + // note that we got a heartbeat from this node + _memberData.at(from).setLastHeartbeatRecv(now); + return Status::OK(); +} + +Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now, + const ReplSetHeartbeatArgsV1& args, + const std::string& ourSetName, + ReplSetHeartbeatResponse* response) { + // Verify that replica set names match + const std::string rshb = args.getSetName(); + if (ourSetName != rshb) { + log() << "replSet set names do not match, ours: " << ourSetName + << "; remote node's: " << rshb; + return Status(ErrorCodes::InconsistentReplicaSetNames, + str::stream() << "Our set name of " << ourSetName << " does not match name " + << rshb + << " reported by remote node"); + } + + const MemberState myState = getMemberState(); + if (_selfIndex == -1) { + if (myState.removed()) { + return Status(ErrorCodes::InvalidReplicaSetConfig, + "Our replica set configuration is invalid or does not include us"); + } + } else { + if (args.getSenderId() == _selfConfig().getId()) { + return Status(ErrorCodes::BadValue, + str::stream() << "Received heartbeat from member with the same " + "member ID as ourself: " + << args.getSenderId()); + } + } + + response->setSetName(ourSetName); + + response->setState(myState.s); + + if (myState.primary()) { + response->setElectionTime(_electionTime); + } + + const OpTime lastOpApplied = getMyLastAppliedOpTime(); + const OpTime lastOpDurable = getMyLastDurableOpTime(); + response->setAppliedOpTime(lastOpApplied); + response->setDurableOpTime(lastOpDurable); + + if (_currentPrimaryIndex != -1) { + response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId()); + } + + response->setTerm(_term); + + if (!_syncSource.empty()) { + response->setSyncingTo(_syncSource); + } + + if (!_rsConfig.isInitialized()) { + response->setConfigVersion(-2); + return Status::OK(); + } + + const long long v = _rsConfig.getConfigVersion(); + response->setConfigVersion(v); + // Deliver new config if caller's version is older than ours + if (v > args.getConfigVersion()) { + response->setConfig(_rsConfig); + } + + // Resolve the caller's id in our Member list + int from = -1; + if (v == args.getConfigVersion() && args.getSenderId() != -1) { + from = _getMemberIndex(args.getSenderId()); + } + if (from == -1) { + return Status::OK(); + } + invariant(from != _selfIndex); + + // note that we got a heartbeat from this node + _memberData.at(from).setLastHeartbeatRecv(now); + return Status::OK(); +} + +int TopologyCoordinatorImpl::_getMemberIndex(int id) const { + int index = 0; + for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); + ++it, ++index) { + if (it->getId() == id) { + return index; + } + } + return -1; +} + +std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequest( + Date_t now, const std::string& ourSetName, const HostAndPort& target) { + PingStats& hbStats = _pings[target]; + Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); + if (!_rsConfig.isInitialized() || + (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) || + (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) { + // This is either the first request ever for "target", or the heartbeat timeout has + // passed, so we're starting a "new" heartbeat. + hbStats.start(now); + alreadyElapsed = Milliseconds(0); + } + ReplSetHeartbeatArgs hbArgs; + hbArgs.setProtocolVersion(1); + hbArgs.setCheckEmpty(false); + if (_rsConfig.isInitialized()) { + hbArgs.setSetName(_rsConfig.getReplSetName()); + hbArgs.setConfigVersion(_rsConfig.getConfigVersion()); + if (_selfIndex >= 0) { + const MemberConfig& me = _selfConfig(); + hbArgs.setSenderHost(me.getHostAndPort()); + hbArgs.setSenderId(me.getId()); + } + } else { + hbArgs.setSetName(ourSetName); + hbArgs.setConfigVersion(-2); + } + if (serverGlobalParams.featureCompatibility.getVersion() != + ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) { + hbArgs.setHeartbeatVersion(1); + } + + const Milliseconds timeoutPeriod( + _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis() + : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod}); + const Milliseconds timeout = timeoutPeriod - alreadyElapsed; + return std::make_pair(hbArgs, timeout); +} + +std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequestV1( + Date_t now, const std::string& ourSetName, const HostAndPort& target) { + PingStats& hbStats = _pings[target]; + Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64()); + if (!_rsConfig.isInitialized() || + (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) || + (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) { + // This is either the first request ever for "target", or the heartbeat timeout has + // passed, so we're starting a "new" heartbeat. + hbStats.start(now); + alreadyElapsed = Milliseconds(0); + } + ReplSetHeartbeatArgsV1 hbArgs; + if (_rsConfig.isInitialized()) { + hbArgs.setSetName(_rsConfig.getReplSetName()); + hbArgs.setConfigVersion(_rsConfig.getConfigVersion()); + if (_selfIndex >= 0) { + const MemberConfig& me = _selfConfig(); + hbArgs.setSenderId(me.getId()); + hbArgs.setSenderHost(me.getHostAndPort()); + } + hbArgs.setTerm(_term); + } else { + hbArgs.setSetName(ourSetName); + // Config version -2 is for uninitialized config. + hbArgs.setConfigVersion(-2); + hbArgs.setTerm(OpTime::kInitialTerm); + } + if (serverGlobalParams.featureCompatibility.getVersion() != + ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) { + hbArgs.setHeartbeatVersion(1); + } + + const Milliseconds timeoutPeriod( + _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis() + : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod}); + const Milliseconds timeout(timeoutPeriod - alreadyElapsed); + return std::make_pair(hbArgs, timeout); +} + +HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse( + Date_t now, + Milliseconds networkRoundTripTime, + const HostAndPort& target, + const StatusWith<ReplSetHeartbeatResponse>& hbResponse) { + const MemberState originalState = getMemberState(); + PingStats& hbStats = _pings[target]; + invariant(hbStats.getLastHeartbeatStartDate() != Date_t()); + const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) || + (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed); + if (!hbResponse.isOK()) { + if (isUnauthorized) { + hbStats.hit(networkRoundTripTime); + } else { + hbStats.miss(); + } + } else { + hbStats.hit(networkRoundTripTime); + // Log diagnostics. + if (hbResponse.getValue().isStateDisagreement()) { + LOG(1) << target << " thinks that we are down because they cannot send us heartbeats."; + } + } + + // If a node is not PRIMARY and has no sync source, we increase the heartbeat rate in order + // to help it find a sync source more quickly, which helps ensure the PRIMARY will continue to + // see the majority of the cluster. + // + // Arbiters also decrease their heartbeat interval to at most half the election timeout period. + Milliseconds heartbeatInterval = _rsConfig.getHeartbeatInterval(); + if (_rsConfig.getProtocolVersion() == 1) { + if (getMemberState().arbiter()) { + heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2, + _rsConfig.getHeartbeatInterval()); + } else if (getSyncSourceAddress().empty() && !_iAmPrimary()) { + heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2, + _rsConfig.getHeartbeatInterval() / 4); + } + } + + const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); + Date_t nextHeartbeatStartDate; + // Determine next heartbeat start time. + if (hbStats.getNumFailuresSinceLastStart() <= kMaxHeartbeatRetries && + alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriod()) { + // There are still retries left, let's use one. + nextHeartbeatStartDate = now; + } else { + nextHeartbeatStartDate = now + heartbeatInterval; + } + + if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) { + const long long currentConfigVersion = + _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2; + const ReplSetConfig& newConfig = hbResponse.getValue().getConfig(); + if (newConfig.getConfigVersion() > currentConfigVersion) { + HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction(); + nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); + return nextAction; + } else { + // Could be we got the newer version before we got the response, or the + // target erroneously sent us one, even through it isn't newer. + if (newConfig.getConfigVersion() < currentConfigVersion) { + LOG(1) << "Config version from heartbeat was older than ours."; + } else { + LOG(2) << "Config from heartbeat response was same as ours."; + } + if (logger::globalLogDomain()->shouldLog(MongoLogDefaultComponent_component, + ::mongo::LogstreamBuilder::severityCast(2))) { + LogstreamBuilder lsb = log(); + if (_rsConfig.isInitialized()) { + lsb << "Current config: " << _rsConfig.toBSON() << "; "; + } + lsb << "Config in heartbeat: " << newConfig.toBSON(); + } + } + } + + // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do, + // so return early. + if (!_rsConfig.isInitialized()) { + HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); + nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); + return nextAction; + } + // If we're not in the config, we don't need to respond to heartbeats. + if (_selfIndex == -1) { + LOG(1) << "Could not find ourself in current config so ignoring heartbeat from " << target + << " -- current config: " << _rsConfig.toBSON(); + HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); + nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); + return nextAction; + } + const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target); + if (memberIndex == -1) { + LOG(1) << "Could not find " << target << " in current config so ignoring --" + " current config: " + << _rsConfig.toBSON(); + HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); + nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); + return nextAction; + } + + invariant(memberIndex != _selfIndex); + + MemberData& hbData = _memberData.at(memberIndex); + const MemberConfig member = _rsConfig.getMemberAt(memberIndex); + bool advancedOpTime = false; + if (!hbResponse.isOK()) { + if (isUnauthorized) { + hbData.setAuthIssue(now); + } else if (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries || + alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriod()) { + hbData.setDownValues(now, hbResponse.getStatus().reason()); + } else { + LOG(3) << "Bad heartbeat response from " << target << "; trying again; Retries left: " + << (kMaxHeartbeatRetries - hbStats.getNumFailuresSinceLastStart()) << "; " + << alreadyElapsed << " have already elapsed"; + } + } else { + ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue()); + LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId() + << ", msg: " << hbr.getHbMsg(); + advancedOpTime = hbData.setUpValues(now, std::move(hbr)); + } + + HeartbeatResponseAction nextAction; + if (_rsConfig.getProtocolVersion() == 0) { + nextAction = _updatePrimaryFromHBData(memberIndex, originalState, now); + } else { + nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now); + } + + nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); + nextAction.setAdvancedOpTime(advancedOpTime); + return nextAction; +} + +bool TopologyCoordinatorImpl::haveNumNodesReachedOpTime(const OpTime& targetOpTime, + int numNodes, + bool durablyWritten) { + // Replication progress that is for some reason ahead of us should not allow us to + // satisfy a write concern if we aren't caught up ourselves. + OpTime myOpTime = durablyWritten ? getMyLastDurableOpTime() : getMyLastAppliedOpTime(); + if (myOpTime < targetOpTime) { + return false; + } + + for (auto&& memberData : _memberData) { + const OpTime& memberOpTime = + durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime(); + if (memberOpTime >= targetOpTime) { + --numNodes; + } + + if (numNodes <= 0) { + return true; + } + } + return false; +} + +bool TopologyCoordinatorImpl::haveTaggedNodesReachedOpTime(const OpTime& opTime, + const ReplSetTagPattern& tagPattern, + bool durablyWritten) { + ReplSetTagMatch matcher(tagPattern); + for (auto&& memberData : _memberData) { + const OpTime& memberOpTime = + durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime(); + if (memberOpTime >= opTime) { + // This node has reached the desired optime, now we need to check if it is a part + // of the tagPattern. + int memberIndex = memberData.getConfigIndex(); + invariant(memberIndex >= 0); + const MemberConfig& memberConfig = _rsConfig.getMemberAt(memberIndex); + for (MemberConfig::TagIterator it = memberConfig.tagsBegin(); + it != memberConfig.tagsEnd(); + ++it) { + if (matcher.update(*it)) { + return true; + } + } + } + } + return false; +} + +HeartbeatResponseAction TopologyCoordinatorImpl::checkMemberTimeouts(Date_t now) { + bool stepdown = false; + for (int memberIndex = 0; memberIndex < static_cast<int>(_memberData.size()); memberIndex++) { + auto& memberData = _memberData[memberIndex]; + if (!memberData.isSelf() && !memberData.lastUpdateStale() && + now - memberData.getLastUpdate() >= _rsConfig.getElectionTimeoutPeriod()) { + memberData.markLastUpdateStale(); + if (_iAmPrimary()) { + stepdown = stepdown || setMemberAsDown(now, memberIndex); + } + } + } + if (stepdown) { + log() << "can't see a majority of the set, relinquishing primary"; + return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); + } + return HeartbeatResponseAction::makeNoAction(); +} + +std::vector<HostAndPort> TopologyCoordinatorImpl::getHostsWrittenTo(const OpTime& op, + bool durablyWritten, + bool skipSelf) { + std::vector<HostAndPort> hosts; + for (const auto& memberData : _memberData) { + if (skipSelf && memberData.isSelf()) { + continue; + } + + if (durablyWritten) { + if (memberData.getLastDurableOpTime() < op) { + continue; + } + } else if (memberData.getLastAppliedOpTime() < op) { + continue; + } + + hosts.push_back(memberData.getHostAndPort()); + } + return hosts; +} + +bool TopologyCoordinatorImpl::setMemberAsDown(Date_t now, const int memberIndex) { + invariant(memberIndex != _selfIndex); + invariant(memberIndex != -1); + invariant(_currentPrimaryIndex == _selfIndex); + MemberData& hbData = _memberData.at(memberIndex); + hbData.setDownValues(now, "no response within election timeout period"); + + if (CannotSeeMajority & _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) { + return true; + } + + return false; +} + +std::pair<int, Date_t> TopologyCoordinatorImpl::getStalestLiveMember() const { + Date_t earliestDate = Date_t::max(); + int earliestMemberId = -1; + for (const auto& memberData : _memberData) { + if (memberData.isSelf()) { + continue; + } + if (memberData.lastUpdateStale()) { + // Already stale. + continue; + } + LOG(3) << "memberData lastupdate is: " << memberData.getLastUpdate(); + if (earliestDate > memberData.getLastUpdate()) { + earliestDate = memberData.getLastUpdate(); + earliestMemberId = memberData.getMemberId(); + } + } + LOG(3) << "stalest member " << earliestMemberId << " date: " << earliestDate; + return std::make_pair(earliestMemberId, earliestDate); +} + +void TopologyCoordinatorImpl::resetAllMemberTimeouts(Date_t now) { + for (auto&& memberData : _memberData) + memberData.updateLiveness(now); +} + +void TopologyCoordinatorImpl::resetMemberTimeouts( + Date_t now, const stdx::unordered_set<HostAndPort>& member_set) { + for (auto&& memberData : _memberData) { + if (member_set.count(memberData.getHostAndPort())) + memberData.updateLiveness(now); + } +} + +OpTime TopologyCoordinatorImpl::getMyLastAppliedOpTime() const { + return _selfMemberData().getLastAppliedOpTime(); +} + +OpTime TopologyCoordinatorImpl::getMyLastDurableOpTime() const { + return _selfMemberData().getLastDurableOpTime(); +} + +MemberData* TopologyCoordinatorImpl::getMyMemberData() { + return &_memberData[_selfMemberDataIndex()]; +} + +MemberData* TopologyCoordinatorImpl::findMemberDataByMemberId(const int memberId) { + const int memberIndex = _getMemberIndex(memberId); + if (memberIndex >= 0) + return &_memberData[memberIndex]; + return nullptr; +} + +MemberData* TopologyCoordinatorImpl::findMemberDataByRid(const OID rid) { + for (auto& memberData : _memberData) { + if (memberData.getRid() == rid) + return &memberData; + } + return nullptr; +} + +MemberData* TopologyCoordinatorImpl::addSlaveMemberData(const OID rid) { + invariant(!_memberData.empty()); // Must always have our own entry first. + invariant(!_rsConfig.isInitialized()); // Used only for master-slave. + _memberData.emplace_back(); + auto* result = &_memberData.back(); + result->setRid(rid); + return result; +} + +HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1( + int updatedConfigIndex, const MemberState& originalState, Date_t now) { + // + // Updates the local notion of which remote node, if any is primary. + // Start the priority takeover process if we are eligible. + // + + invariant(updatedConfigIndex != _selfIndex); + + // If we are missing from the config, do not participate in primary maintenance or election. + if (_selfIndex == -1) { + return HeartbeatResponseAction::makeNoAction(); + } + // If we are the primary, there must be no other primary, otherwise its higher term would + // have already made us step down. + if (_currentPrimaryIndex == _selfIndex) { + return HeartbeatResponseAction::makeNoAction(); + } + + // Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex. + int primaryIndex = -1; + for (size_t i = 0; i < _memberData.size(); i++) { + const MemberData& member = _memberData.at(i); + if (member.getState().primary() && member.up()) { + if (primaryIndex == -1 || _memberData.at(primaryIndex).getTerm() < member.getTerm()) { + primaryIndex = i; + } + } + } + _currentPrimaryIndex = primaryIndex; + if (_currentPrimaryIndex == -1) { + return HeartbeatResponseAction::makeNoAction(); + } + + // Clear last heartbeat message on ourselves. + setMyHeartbeatMessage(now, ""); + + // Takeover when the replset is stable. + // + // Take over the primary only if the remote primary is in the latest term I know. + // This is done only when we get a heartbeat response from the primary. + // Otherwise, there must be an outstanding election, which may succeed or not, but + // the remote primary will become aware of that election eventually and step down. + if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) { + + // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled. + bool catchupTakeoverDisabled = + ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() || + ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay(); + if (!catchupTakeoverDisabled && (_memberData.at(primaryIndex).getLastAppliedOpTime() < + _memberData.at(_selfIndex).getLastAppliedOpTime())) { + LOG(2) << "I can take over the primary due to fresher data." + << " Current primary index: " << primaryIndex << " in term " + << _memberData.at(primaryIndex).getTerm() << "." + << " Current primary optime: " + << _memberData.at(primaryIndex).getLastAppliedOpTime() + << " My optime: " << _memberData.at(_selfIndex).getLastAppliedOpTime(); + + return HeartbeatResponseAction::makeCatchupTakeoverAction(); + } + + if (_rsConfig.getMemberAt(primaryIndex).getPriority() < + _rsConfig.getMemberAt(_selfIndex).getPriority()) { + LOG(4) << "I can take over the primary due to higher priority." + << " Current primary index: " << primaryIndex << " in term " + << _memberData.at(primaryIndex).getTerm(); + + return HeartbeatResponseAction::makePriorityTakeoverAction(); + } + } + return HeartbeatResponseAction::makeNoAction(); +} + +HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData( + int updatedConfigIndex, const MemberState& originalState, Date_t now) { + // This method has two interrelated responsibilities, performed in two phases. + // + // First, it updates the local notion of which remote node, if any is primary. In the + // process, it may request a remote primary to step down because there is a higher priority + // node waiting, or because the local node thinks it is primary and that it has a more + // recent electionTime. It may instead decide that the local node should step down itself, + // because a remote has a more recent election time. + // + // Second, if there is no remote primary, and the local node is not primary, it considers + // whether or not to stand for election. + invariant(updatedConfigIndex != _selfIndex); + + // We are missing from the config, so do not participate in primary maintenance or election. + if (_selfIndex == -1) { + return HeartbeatResponseAction::makeNoAction(); + } + + //////////////////// + // Phase 1 + //////////////////// + + // If we believe the node whose data was just updated is primary, confirm that + // the updated data supports that notion. If not, erase our notion of who is primary. + if (updatedConfigIndex == _currentPrimaryIndex) { + const MemberData& updatedHBData = _memberData.at(updatedConfigIndex); + if (!updatedHBData.up() || !updatedHBData.getState().primary()) { + _currentPrimaryIndex = -1; + } + } + + // If the current primary is not highest priority and up to date (within 10s), + // have them/me stepdown. + if (_currentPrimaryIndex != -1) { + // check if we should ask the primary (possibly ourselves) to step down + const int highestPriorityIndex = _getHighestPriorityElectableIndex(now); + if (highestPriorityIndex != -1) { + const MemberConfig& currentPrimaryMember = _rsConfig.getMemberAt(_currentPrimaryIndex); + const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex); + const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex + ? getMyLastAppliedOpTime() + : _memberData.at(highestPriorityIndex).getHeartbeatAppliedOpTime(); + + if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) && + _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime)) { + const OpTime latestOpTime = _latestKnownOpTime(); + + if (_iAmPrimary()) { + if (_leaderMode == LeaderMode::kSteppingDown) { + return HeartbeatResponseAction::makeNoAction(); + } + log() << "Stepping down self (priority " << currentPrimaryMember.getPriority() + << ") because " << highestPriorityMember.getHostAndPort() + << " has higher priority " << highestPriorityMember.getPriority() + << " and is only " + << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs()) + << " seconds behind me"; + const Date_t until = + now + VoteLease::leaseTime + _rsConfig.getHeartbeatInterval(); + if (_electionSleepUntil < until) { + _electionSleepUntil = until; + } + return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); + } else if ((highestPriorityIndex == _selfIndex) && (_electionSleepUntil <= now)) { + // If this node is the highest priority node, and it is not in + // an inter-election sleep period, ask the current primary to step down. + // This is an optimization, because the remote primary will almost certainly + // notice this node's electability promptly, via its own heartbeat process. + log() << "Requesting that " << currentPrimaryMember.getHostAndPort() + << " (priority " << currentPrimaryMember.getPriority() + << ") step down because I have higher priority " + << highestPriorityMember.getPriority() << " and am only " + << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs()) + << " seconds behind it"; + int primaryIndex = _currentPrimaryIndex; + _currentPrimaryIndex = -1; + return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex); + } + } + } + } + + // Scan the member list's heartbeat data for who is primary, and update + // _currentPrimaryIndex and _role, or request a remote to step down, as necessary. + { + int remotePrimaryIndex = -1; + for (std::vector<MemberData>::const_iterator it = _memberData.begin(); + it != _memberData.end(); + ++it) { + const int itIndex = indexOfIterator(_memberData, it); + if (itIndex == _selfIndex) { + continue; + } + + if (it->getState().primary() && it->up()) { + if (remotePrimaryIndex != -1) { + // two other nodes think they are primary (asynchronously polled) + // -- wait for things to settle down. + warning() << "two remote primaries (transiently)"; + return HeartbeatResponseAction::makeNoAction(); + } + remotePrimaryIndex = itIndex; + } + } + + if (remotePrimaryIndex != -1) { + // If it's the same as last time, don't do anything further. + if (_currentPrimaryIndex == remotePrimaryIndex) { + return HeartbeatResponseAction::makeNoAction(); + } + // Clear last heartbeat message on ourselves (why?) + setMyHeartbeatMessage(now, ""); + + // If we are also primary, this is a problem. Determine who should step down. + if (_iAmPrimary()) { + Timestamp remoteElectionTime = _memberData.at(remotePrimaryIndex).getElectionTime(); + log() << "another primary seen with election time " << remoteElectionTime + << " my election time is " << _electionTime; + + // Step down whomever has the older election time. + if (remoteElectionTime > _electionTime) { + if (_leaderMode == LeaderMode::kSteppingDown) { + return HeartbeatResponseAction::makeNoAction(); + } + log() << "stepping down; another primary was elected more recently"; + return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); + } else { + log() << "another PRIMARY detected and it should step down" + " since it was elected earlier than me"; + return HeartbeatResponseAction::makeStepDownRemoteAction(remotePrimaryIndex); + } + } + + _currentPrimaryIndex = remotePrimaryIndex; + return HeartbeatResponseAction::makeNoAction(); + } + } + + //////////////////// + // Phase 2 + //////////////////// + + // We do not believe any remote to be primary. + + // If we are primary, check if we can still see majority of the set; + // stepdown if we can't. + if (_iAmPrimary()) { + if (CannotSeeMajority & + _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) { + if (_leaderMode == LeaderMode::kSteppingDown) { + return HeartbeatResponseAction::makeNoAction(); + } + log() << "can't see a majority of the set, relinquishing primary"; + return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); + } + + LOG(2) << "Choosing to remain primary"; + return HeartbeatResponseAction::makeNoAction(); + } + + fassert(18505, _currentPrimaryIndex == -1); + + const MemberState currentState = getMemberState(); + if (originalState.recovering() && currentState.secondary()) { + // We just transitioned from RECOVERING to SECONDARY, this can only happen if we + // received a heartbeat with an auth error when previously all the heartbeats we'd + // received had auth errors. In this case, don't return makeElectAction() because + // that could cause the election to start before the ReplicationCoordinator has updated + // its notion of the member state to SECONDARY. Instead return noAction so that the + // ReplicationCooridinator knows to update its tracking of the member state off of the + // TopologyCoordinator, and leave starting the election until the next heartbeat comes + // back. + return HeartbeatResponseAction::makeNoAction(); + } + + // At this point, there is no primary anywhere. Check to see if we should become a candidate. + const auto status = checkShouldStandForElection(now); + if (!status.isOK()) { + // NOTE: This log line is checked in unit test(s). + LOG(2) << "TopologyCoordinatorImpl::_updatePrimaryFromHBData - " << status.reason(); + return HeartbeatResponseAction::makeNoAction(); + } + fassertStatusOK(28816, becomeCandidateIfElectable(now, StartElectionReason::kElectionTimeout)); + return HeartbeatResponseAction::makeElectAction(); +} + +Status TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now) const { + if (_currentPrimaryIndex != -1) { + return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"}; + } + invariant(_role != Role::leader); + + if (_role == Role::candidate) { + return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"}; + } + + const UnelectableReasonMask unelectableReason = + _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout); + if (NotCloseEnoughToLatestOptime & unelectableReason) { + return {ErrorCodes::NodeNotElectable, + str::stream() << "Not standing for election because " + << _getUnelectableReasonString(unelectableReason) + << "; my last optime is " + << getMyLastAppliedOpTime().toString() + << " and the newest is " + << _latestKnownOpTime().toString()}; + } + if (unelectableReason) { + return {ErrorCodes::NodeNotElectable, + str::stream() << "Not standing for election because " + << _getUnelectableReasonString(unelectableReason)}; + } + if (_electionSleepUntil > now) { + if (_rsConfig.getProtocolVersion() == 1) { + return { + ErrorCodes::NodeNotElectable, + str::stream() << "Not standing for election before " + << dateToISOStringLocal(_electionSleepUntil) + << " because I stood up or learned about a new term too recently"}; + } else { + return {ErrorCodes::NodeNotElectable, + str::stream() << "Not standing for election before " + << dateToISOStringLocal(_electionSleepUntil) + << " because I stood too recently"}; + } + } + // All checks passed. Start election proceedings. + return Status::OK(); +} + +bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const { + int vUp = 0; + for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); + ++it) { + const int itIndex = indexOfIterator(_memberData, it); + if (itIndex == _selfIndex || it->up()) { + vUp += _rsConfig.getMemberAt(itIndex).getNumVotes(); + } + } + + return vUp * 2 > _rsConfig.getTotalVotingMembers(); +} + +int TopologyCoordinatorImpl::_findHealthyPrimaryOfEqualOrGreaterPriority( + const int candidateIndex) const { + const double candidatePriority = _rsConfig.getMemberAt(candidateIndex).getPriority(); + for (auto it = _memberData.begin(); it != _memberData.end(); ++it) { + if (!it->up() || it->getState() != MemberState::RS_PRIMARY) { + continue; + } + const int itIndex = indexOfIterator(_memberData, it); + const double priority = _rsConfig.getMemberAt(itIndex).getPriority(); + if (itIndex != candidateIndex && priority >= candidatePriority) { + return itIndex; + } + } + + return -1; +} + +bool TopologyCoordinatorImpl::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const { + const OpTime latestKnownOpTime = _latestKnownOpTime(); + // Use addition instead of subtraction to avoid overflow. + return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs()); +} + +bool TopologyCoordinatorImpl::_amIFreshEnoughForPriorityTakeover() const { + const OpTime latestKnownOpTime = _latestKnownOpTime(); + + // Rules are: + // - If the terms don't match, we don't call for priority takeover. + // - If our optime and the latest optime happen in different seconds, our optime must be within + // at least priorityTakeoverFreshnessWindowSeconds seconds of the latest optime. + // - If our optime and the latest optime happen in the same second, our optime must be within + // at least 1000 oplog entries of the latest optime (i.e. the increment portion of the timestamp + // must be within 1000). This is to handle the case where a primary had its clock set far into + // the future, took some writes, then had its clock set back. In that case the timestamp + // component of all future oplog entries generated will be the same, until real world time + // passes the timestamp component of the last oplog entry. + + const OpTime ourLastOpApplied = getMyLastAppliedOpTime(); + if (ourLastOpApplied.getTerm() != latestKnownOpTime.getTerm()) { + return false; + } + + if (ourLastOpApplied.getTimestamp().getSecs() != latestKnownOpTime.getTimestamp().getSecs()) { + return ourLastOpApplied.getTimestamp().getSecs() + priorityTakeoverFreshnessWindowSeconds >= + latestKnownOpTime.getTimestamp().getSecs(); + } else { + return ourLastOpApplied.getTimestamp().getInc() + 1000 >= + latestKnownOpTime.getTimestamp().getInc(); + } +} + +bool TopologyCoordinatorImpl::_amIFreshEnoughForCatchupTakeover() const { + + const OpTime latestKnownOpTime = _latestKnownOpTime(); + + // Rules are: + // - We must have the freshest optime of all the up nodes. + // - We must specifically have a fresher optime than the primary (can't be equal). + // - The term of our last applied op must be less than the current term. This ensures that no + // writes have happened since the most recent election and that the primary is still in + // catchup mode. + + // There is no point to a catchup takeover if we aren't the freshest node because + // another node would immediately perform another catchup takeover when we become primary. + const OpTime ourLastOpApplied = getMyLastAppliedOpTime(); + if (ourLastOpApplied < latestKnownOpTime) { + return false; + } + + if (_currentPrimaryIndex == -1) { + return false; + } + + // If we aren't ahead of the primary, there is no point to having a catchup takeover. + const OpTime primaryLastOpApplied = _memberData[_currentPrimaryIndex].getLastAppliedOpTime(); + + if (ourLastOpApplied <= primaryLastOpApplied) { + return false; + } + + // If the term of our last applied op is less than the current term, the primary didn't write + // anything and it is still in catchup mode. + return ourLastOpApplied.getTerm() < _term; +} + +bool TopologyCoordinatorImpl::_iAmPrimary() const { + if (_role == Role::leader) { + invariant(_currentPrimaryIndex == _selfIndex); + invariant(_leaderMode != LeaderMode::kNotLeader); + return true; + } + return false; +} + +OpTime TopologyCoordinatorImpl::_latestKnownOpTime() const { + OpTime latest = getMyLastAppliedOpTime(); + for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); + ++it) { + // Ignore self + // TODO(russotto): Simplify when heartbeat and spanning tree times are combined. + if (it->isSelf()) { + continue; + } + // Ignore down members + if (!it->up()) { + continue; + } + // Ignore removed nodes (not in config, so not valid). + if (it->getState().removed()) { + continue; + } + + OpTime optime = it->getHeartbeatAppliedOpTime(); + + if (optime > latest) { + latest = optime; + } + } + + return latest; +} + +bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex, + int memberTwoIndex) const { + if (memberOneIndex == -1) + return false; + + if (memberTwoIndex == -1) + return true; + + return _rsConfig.getMemberAt(memberOneIndex).getPriority() > + _rsConfig.getMemberAt(memberTwoIndex).getPriority(); +} + +int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now) const { + int maxIndex = -1; + for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) { + UnelectableReasonMask reason = currentIndex == _selfIndex + ? _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout) + : _getUnelectableReason(currentIndex); + if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) { + maxIndex = currentIndex; + } + } + + return maxIndex; +} + +bool TopologyCoordinatorImpl::prepareForUnconditionalStepDown() { + if (_leaderMode == LeaderMode::kSteppingDown) { + // Can only be processing one required stepdown at a time. + return false; + } + // Heartbeat-initiated stepdowns take precedence over stepdown command initiated stepdowns, so + // it's safe to transition from kAttemptingStepDown to kSteppingDown. + _setLeaderMode(LeaderMode::kSteppingDown); + return true; +} + +Status TopologyCoordinatorImpl::prepareForStepDownAttempt() { + if (_leaderMode == LeaderMode::kSteppingDown || + _leaderMode == LeaderMode::kAttemptingStepDown) { + return Status{ErrorCodes::ConflictingOperationInProgress, + "This node is already in the process of stepping down"}; + } + _setLeaderMode(LeaderMode::kAttemptingStepDown); + return Status::OK(); +} + +void TopologyCoordinatorImpl::abortAttemptedStepDownIfNeeded() { + if (_leaderMode == TopologyCoordinator::LeaderMode::kAttemptingStepDown) { + _setLeaderMode(TopologyCoordinator::LeaderMode::kMaster); + } +} + +void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMemberState, + const Timestamp& electionTime) { + invariant(_selfIndex != -1); + if (newMemberState == getMemberState()) + return; + switch (newMemberState.s) { + case MemberState::RS_PRIMARY: + _role = Role::candidate; + processWinElection(OID(), electionTime); + invariant(_role == Role::leader); + break; + case MemberState::RS_SECONDARY: + case MemberState::RS_ROLLBACK: + case MemberState::RS_RECOVERING: + case MemberState::RS_STARTUP2: + _role = Role::follower; + _followerMode = newMemberState.s; + if (_currentPrimaryIndex == _selfIndex) { + _currentPrimaryIndex = -1; + _setLeaderMode(LeaderMode::kNotLeader); + } + break; + case MemberState::RS_STARTUP: + updateConfig(ReplSetConfig(), -1, Date_t()); + break; + default: + severe() << "Cannot switch to state " << newMemberState; + invariant(false); + } + if (getMemberState() != newMemberState.s) { + severe() << "Expected to enter state " << newMemberState << " but am now in " + << getMemberState(); + invariant(false); + } + log() << newMemberState; +} + +void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) { + if (primaryIndex == _selfIndex) { + changeMemberState_forTest(MemberState::RS_PRIMARY); + } else { + if (_iAmPrimary()) { + changeMemberState_forTest(MemberState::RS_SECONDARY); + } + if (primaryIndex != -1) { + ReplSetHeartbeatResponse hbResponse; + hbResponse.setState(MemberState::RS_PRIMARY); + hbResponse.setElectionTime(Timestamp()); + hbResponse.setAppliedOpTime(_memberData.at(primaryIndex).getHeartbeatAppliedOpTime()); + hbResponse.setSyncingTo(HostAndPort()); + hbResponse.setHbMsg(""); + _memberData.at(primaryIndex) + .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(), + std::move(hbResponse)); + } + _currentPrimaryIndex = primaryIndex; + } +} + +const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const { + if (_currentPrimaryIndex == -1) + return NULL; + + return &(_rsConfig.getMemberAt(_currentPrimaryIndex)); +} + +void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, + BSONObjBuilder* response, + Status* result) { + // output for each member + vector<BSONObj> membersOut; + const MemberState myState = getMemberState(); + const Date_t now = rsStatusArgs.now; + const OpTime lastOpApplied = getMyLastAppliedOpTime(); + const OpTime lastOpDurable = getMyLastDurableOpTime(); + const BSONObj& initialSyncStatus = rsStatusArgs.initialSyncStatus; + + if (_selfIndex == -1) { + // We're REMOVED or have an invalid config + response->append("state", static_cast<int>(myState.s)); + response->append("stateStr", myState.toString()); + response->append("uptime", rsStatusArgs.selfUptime); + + appendOpTime(response, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); + + response->appendDate("optimeDate", + Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); + if (_maintenanceModeCalls) { + response->append("maintenanceMode", _maintenanceModeCalls); + } + std::string s = _getHbmsg(now); + if (!s.empty()) + response->append("infoMessage", s); + *result = Status(ErrorCodes::InvalidReplicaSetConfig, + "Our replica set config is invalid or we are not a member of it"); + return; + } + + for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); + ++it) { + const int itIndex = indexOfIterator(_memberData, it); + if (itIndex == _selfIndex) { + // add self + BSONObjBuilder bb; + bb.append("_id", _selfConfig().getId()); + bb.append("name", _selfConfig().getHostAndPort().toString()); + bb.append("health", 1.0); + bb.append("state", static_cast<int>(myState.s)); + bb.append("stateStr", myState.toString()); + bb.append("uptime", rsStatusArgs.selfUptime); + if (!_selfConfig().isArbiter()) { + appendOpTime(&bb, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); + bb.appendDate("optimeDate", + Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); + } + + if (!_syncSource.empty() && !_iAmPrimary()) { + bb.append("syncingTo", _syncSource.toString()); + } + + if (_maintenanceModeCalls) { + bb.append("maintenanceMode", _maintenanceModeCalls); + } + + std::string s = _getHbmsg(now); + if (!s.empty()) + bb.append("infoMessage", s); + + if (myState.primary()) { + bb.append("electionTime", _electionTime); + bb.appendDate("electionDate", + Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs()))); + } + bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion()); + bb.append("self", true); + membersOut.push_back(bb.obj()); + } else { + // add non-self member + const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex); + BSONObjBuilder bb; + bb.append("_id", itConfig.getId()); + bb.append("name", itConfig.getHostAndPort().toString()); + double h = it->getHealth(); + bb.append("health", h); + const MemberState state = it->getState(); + bb.append("state", static_cast<int>(state.s)); + if (h == 0) { + // if we can't connect the state info is from the past + // and could be confusing to show + bb.append("stateStr", "(not reachable/healthy)"); + } else { + bb.append("stateStr", it->getState().toString()); + } + + const unsigned int uptime = static_cast<unsigned int>(( + it->getUpSince() != Date_t() ? durationCount<Seconds>(now - it->getUpSince()) : 0)); + bb.append("uptime", uptime); + if (!itConfig.isArbiter()) { + appendOpTime( + &bb, "optime", it->getHeartbeatAppliedOpTime(), _rsConfig.getProtocolVersion()); + appendOpTime(&bb, + "optimeDurable", + it->getHeartbeatDurableOpTime(), + _rsConfig.getProtocolVersion()); + + bb.appendDate("optimeDate", + Date_t::fromDurationSinceEpoch( + Seconds(it->getHeartbeatAppliedOpTime().getSecs()))); + bb.appendDate("optimeDurableDate", + Date_t::fromDurationSinceEpoch( + Seconds(it->getHeartbeatDurableOpTime().getSecs()))); + } + bb.appendDate("lastHeartbeat", it->getLastHeartbeat()); + bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv()); + Milliseconds ping = _getPing(itConfig.getHostAndPort()); + if (ping != UninitializedPing) { + bb.append("pingMs", durationCount<Milliseconds>(ping)); + std::string s = it->getLastHeartbeatMsg(); + if (!s.empty()) + bb.append("lastHeartbeatMessage", s); + } + if (it->hasAuthIssue()) { + bb.append("authenticated", false); + } + const HostAndPort& syncSource = it->getSyncSource(); + if (!syncSource.empty() && !state.primary()) { + bb.append("syncingTo", syncSource.toString()); + } + + if (state == MemberState::RS_PRIMARY) { + bb.append("electionTime", it->getElectionTime()); + bb.appendDate( + "electionDate", + Date_t::fromDurationSinceEpoch(Seconds(it->getElectionTime().getSecs()))); + } + bb.appendIntOrLL("configVersion", it->getConfigVersion()); + membersOut.push_back(bb.obj()); + } + } + + // sort members bson + sort(membersOut.begin(), membersOut.end(), SimpleBSONObjComparator::kInstance.makeLessThan()); + + response->append("set", _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : ""); + response->append("date", now); + response->append("myState", myState.s); + response->append("term", _term); + + // Add sync source info + if (!_syncSource.empty() && !myState.primary() && !myState.removed()) { + response->append("syncingTo", _syncSource.toString()); + } + + if (_rsConfig.isConfigServer()) { + response->append("configsvr", true); + } + + response->append("heartbeatIntervalMillis", + durationCount<Milliseconds>(_rsConfig.getHeartbeatInterval())); + + // New optimes, to hold them all. + BSONObjBuilder optimes; + _lastCommittedOpTime.append(&optimes, "lastCommittedOpTime"); + if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) { + rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime"); + } + + appendOpTime(&optimes, "appliedOpTime", lastOpApplied, _rsConfig.getProtocolVersion()); + appendOpTime(&optimes, "durableOpTime", lastOpDurable, _rsConfig.getProtocolVersion()); + response->append("optimes", optimes.obj()); + + if (!initialSyncStatus.isEmpty()) { + response->append("initialSyncStatus", initialSyncStatus); + } + + response->append("members", membersOut); + *result = Status::OK(); +} + +StatusWith<BSONObj> TopologyCoordinatorImpl::prepareReplSetUpdatePositionCommand( + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle, + OpTime currentCommittedSnapshotOpTime) const { + BSONObjBuilder cmdBuilder; + invariant(_rsConfig.isInitialized()); + // Do not send updates if we have been removed from the config. + if (_selfIndex == -1) { + return Status(ErrorCodes::NodeNotFound, + "This node is not in the current replset configuration."); + } + cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1); + // Create an array containing objects each live member connected to us and for ourself. + BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes")); + for (const auto& memberData : _memberData) { + if (memberData.getLastAppliedOpTime().isNull()) { + // Don't include info on members we haven't heard from yet. + continue; + } + // Don't include members we think are down. + if (!memberData.isSelf() && memberData.lastUpdateStale()) { + continue; + } + + BSONObjBuilder entry(arrayBuilder.subobjStart()); + switch (commandStyle) { + case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle: + memberData.getLastDurableOpTime().append( + &entry, UpdatePositionArgs::kDurableOpTimeFieldName); + memberData.getLastAppliedOpTime().append( + &entry, UpdatePositionArgs::kAppliedOpTimeFieldName); + break; + case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle: + entry.append("_id", memberData.getRid()); + if (_rsConfig.getProtocolVersion() == 1) { + memberData.getLastDurableOpTime().append(&entry, "optime"); + } else { + entry.append("optime", memberData.getLastDurableOpTime().getTimestamp()); + } + break; + } + entry.append(UpdatePositionArgs::kMemberIdFieldName, memberData.getMemberId()); + entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion()); + } + arrayBuilder.done(); + + // Add metadata to command. Old style parsing logic will reject the metadata. + if (commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) { + prepareReplSetMetadata(currentCommittedSnapshotOpTime) + .writeToMetadata(&cmdBuilder) + .transitional_ignore(); + } + return cmdBuilder.obj(); +} + +void TopologyCoordinatorImpl::fillMemberData(BSONObjBuilder* result) { + BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress")); + { + for (const auto& memberData : _memberData) { + BSONObjBuilder entry(replicationProgress.subobjStart()); + entry.append("rid", memberData.getRid()); + const auto lastDurableOpTime = memberData.getLastDurableOpTime(); + if (_rsConfig.getProtocolVersion() == 1) { + BSONObjBuilder opTime(entry.subobjStart("optime")); + opTime.append("ts", lastDurableOpTime.getTimestamp()); + opTime.append("term", lastDurableOpTime.getTerm()); + opTime.done(); + } else { + entry.append("optime", lastDurableOpTime.getTimestamp()); + } + entry.append("host", memberData.getHostAndPort().toString()); + if (_selfIndex >= 0) { + const int memberId = memberData.getMemberId(); + invariant(memberId >= 0); + entry.append("memberId", memberId); + } + } + } +} + +void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) { + const MemberState myState = getMemberState(); + if (!_rsConfig.isInitialized()) { + response->markAsNoConfig(); + return; + } + + for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); + ++it) { + if (it->isHidden() || it->getSlaveDelay() > Seconds{0}) { + continue; + } + + if (it->isElectable()) { + response->addHost(it->getHostAndPort()); + } else if (it->isArbiter()) { + response->addArbiter(it->getHostAndPort()); + } else { + response->addPassive(it->getHostAndPort()); + } + } + + response->setReplSetName(_rsConfig.getReplSetName()); + if (myState.removed()) { + response->markAsNoConfig(); + return; + } + + response->setReplSetVersion(_rsConfig.getConfigVersion()); + response->setIsMaster(myState.primary()); + response->setIsSecondary(myState.secondary()); + + const MemberConfig* curPrimary = _currentPrimaryMember(); + if (curPrimary) { + response->setPrimary(curPrimary->getHostAndPort()); + } + + const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); + if (selfConfig.isArbiter()) { + response->setIsArbiterOnly(true); + } else if (selfConfig.getPriority() == 0) { + response->setIsPassive(true); + } + if (selfConfig.getSlaveDelay() > Seconds(0)) { + response->setSlaveDelay(selfConfig.getSlaveDelay()); + } + if (selfConfig.isHidden()) { + response->setIsHidden(true); + } + if (!selfConfig.shouldBuildIndexes()) { + response->setShouldBuildIndexes(false); + } + const ReplSetTagConfig tagConfig = _rsConfig.getTagConfig(); + if (selfConfig.hasTags(tagConfig)) { + for (MemberConfig::TagIterator tag = selfConfig.tagsBegin(); tag != selfConfig.tagsEnd(); + ++tag) { + std::string tagKey = tagConfig.getTagKey(*tag); + if (tagKey[0] == '$') { + // Filter out internal tags + continue; + } + response->addTag(tagKey, tagConfig.getTagValue(*tag)); + } + } + response->setMe(selfConfig.getHostAndPort()); + if (_iAmPrimary()) { + response->setElectionId(_electionId); + } +} + +StatusWith<TopologyCoordinatorImpl::PrepareFreezeResponseResult> +TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) { + if (_role != TopologyCoordinator::Role::follower) { + std::string msg = str::stream() + << "cannot freeze node when primary or running for election. state: " + << (_role == TopologyCoordinator::Role::leader ? "Primary" : "Running-Election"); + log() << msg; + return Status(ErrorCodes::NotSecondary, msg); + } + + if (secs == 0) { + _stepDownUntil = now; + log() << "'unfreezing'"; + response->append("info", "unfreezing"); + + if (_isElectableNodeInSingleNodeReplicaSet()) { + // If we are a one-node replica set, we're the one member, + // we're electable, we're not in maintenance mode, and we are currently in followerMode + // SECONDARY, we must transition to candidate now that our stepdown period + // is no longer active, in leiu of heartbeats. + _role = Role::candidate; + return PrepareFreezeResponseResult::kElectSelf; + } + } else { + if (secs == 1) + response->append("warning", "you really want to freeze for only 1 second?"); + + _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs)); + log() << "'freezing' for " << secs << " seconds"; + } + + return PrepareFreezeResponseResult::kNoAction; +} + +bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) { + if (_stepDownUntil > now) { + return false; + } + + if (_isElectableNodeInSingleNodeReplicaSet()) { + // If the new config describes a one-node replica set, we're the one member, + // we're electable, we're not in maintenance mode, and we are currently in followerMode + // SECONDARY, we must transition to candidate, in leiu of heartbeats. + _role = Role::candidate; + return true; + } + return false; +} + +void TopologyCoordinatorImpl::setElectionSleepUntil(Date_t newTime) { + if (_electionSleepUntil < newTime) { + _electionSleepUntil = newTime; + } +} + +Timestamp TopologyCoordinatorImpl::getElectionTime() const { + return _electionTime; +} + +OID TopologyCoordinatorImpl::getElectionId() const { + return _electionId; +} + +int TopologyCoordinatorImpl::getCurrentPrimaryIndex() const { + return _currentPrimaryIndex; +} + +Date_t TopologyCoordinatorImpl::getStepDownTime() const { + return _stepDownUntil; +} + +void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig, + int selfIndex, + Date_t now) { + std::vector<MemberData> oldHeartbeats; + _memberData.swap(oldHeartbeats); + + int index = 0; + for (ReplSetConfig::MemberIterator it = newConfig.membersBegin(); it != newConfig.membersEnd(); + ++it, ++index) { + const MemberConfig& newMemberConfig = *it; + MemberData newHeartbeatData; + for (auto&& oldMemberData : oldHeartbeats) { + if ((oldMemberData.getMemberId() == newMemberConfig.getId() && + oldMemberData.getHostAndPort() == newMemberConfig.getHostAndPort()) || + (index == selfIndex && oldMemberData.isSelf())) { + // This member existed in the old config with the same member ID and + // HostAndPort, so copy its heartbeat data over. + newHeartbeatData = oldMemberData; + break; + } + } + newHeartbeatData.setConfigIndex(index); + newHeartbeatData.setIsSelf(index == selfIndex); + newHeartbeatData.setHostAndPort(newMemberConfig.getHostAndPort()); + newHeartbeatData.setMemberId(newMemberConfig.getId()); + _memberData.push_back(newHeartbeatData); + } + if (selfIndex < 0) { + // It's necessary to have self member data even if self isn't in the configuration. + // We don't need data for the other nodes (which no longer know about us, or soon won't) + _memberData.clear(); + // We're not in the config, we can't sync any more. + _syncSource = HostAndPort(); + MemberData newHeartbeatData; + for (auto&& oldMemberData : oldHeartbeats) { + if (oldMemberData.isSelf()) { + newHeartbeatData = oldMemberData; + break; + } + } + newHeartbeatData.setConfigIndex(-1); + newHeartbeatData.setIsSelf(true); + _memberData.push_back(newHeartbeatData); + } +} + +// This function installs a new config object and recreates MemberData objects +// that reflect the new config. +void TopologyCoordinatorImpl::updateConfig(const ReplSetConfig& newConfig, + int selfIndex, + Date_t now) { + invariant(_role != Role::candidate); + invariant(selfIndex < newConfig.getNumMembers()); + + // Reset term on startup and upgrade/downgrade of protocol version. + if (!_rsConfig.isInitialized() || + _rsConfig.getProtocolVersion() != newConfig.getProtocolVersion()) { + if (newConfig.getProtocolVersion() == 1) { + _term = OpTime::kInitialTerm; + } else { + invariant(newConfig.getProtocolVersion() == 0); + _term = OpTime::kUninitializedTerm; + } + LOG(1) << "Updated term in topology coordinator to " << _term << " due to new config"; + } + + _updateHeartbeatDataForReconfig(newConfig, selfIndex, now); + _rsConfig = newConfig; + _selfIndex = selfIndex; + _forceSyncSourceIndex = -1; + + if (_role == Role::leader) { + if (_selfIndex == -1) { + log() << "Could not remain primary because no longer a member of the replica set"; + } else if (!_selfConfig().isElectable()) { + log() << " Could not remain primary because no longer electable"; + } else { + // Don't stepdown if you don't have to. + _currentPrimaryIndex = _selfIndex; + return; + } + _role = Role::follower; + _setLeaderMode(LeaderMode::kNotLeader); + } + + // By this point we know we are in Role::follower + _currentPrimaryIndex = -1; // force secondaries to re-detect who the primary is + + if (_isElectableNodeInSingleNodeReplicaSet()) { + // If the new config describes a one-node replica set, we're the one member, + // we're electable, we're not in maintenance mode and we are currently in followerMode + // SECONDARY, we must transition to candidate, in leiu of heartbeats. + _role = Role::candidate; + } +} +std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const { + // ignore messages over 2 minutes old + if ((now - _hbmsgTime) > Seconds{120}) { + return ""; + } + return _hbmsg; +} + +void TopologyCoordinatorImpl::setMyHeartbeatMessage(const Date_t now, const std::string& message) { + _hbmsgTime = now; + _hbmsg = message; +} + +const MemberConfig& TopologyCoordinatorImpl::_selfConfig() const { + return _rsConfig.getMemberAt(_selfIndex); +} + +const MemberData& TopologyCoordinatorImpl::_selfMemberData() const { + return _memberData[_selfMemberDataIndex()]; +} + +const int TopologyCoordinatorImpl::_selfMemberDataIndex() const { + invariant(!_memberData.empty()); + if (_selfIndex >= 0) + return _selfIndex; + // In master-slave mode, the first entry is for self. If there is no config + // or we're not in the config, the first-and-only entry should be for self. + return 0; +} + +TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnelectableReason( + int index) const { + invariant(index != _selfIndex); + const MemberConfig& memberConfig = _rsConfig.getMemberAt(index); + const MemberData& hbData = _memberData.at(index); + UnelectableReasonMask result = None; + if (memberConfig.isArbiter()) { + result |= ArbiterIAm; + } + if (memberConfig.getPriority() <= 0) { + result |= NoPriority; + } + if (hbData.getState() != MemberState::RS_SECONDARY) { + result |= NotSecondary; + } + if (_rsConfig.getProtocolVersion() == 0 && + !_isOpTimeCloseEnoughToLatestToElect(hbData.getHeartbeatAppliedOpTime())) { + result |= NotCloseEnoughToLatestOptime; + } + if (hbData.up() && hbData.isUnelectable()) { + result |= RefusesToStand; + } + invariant(result || memberConfig.isElectable()); + return result; +} + +TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUnelectableReason( + const Date_t now, StartElectionReason reason) const { + UnelectableReasonMask result = None; + const OpTime lastApplied = getMyLastAppliedOpTime(); + if (lastApplied.isNull()) { + result |= NoData; + } + if (!_aMajoritySeemsToBeUp()) { + result |= CannotSeeMajority; + } + if (_selfIndex == -1) { + result |= NotInitialized; + return result; + } + if (_selfConfig().isArbiter()) { + result |= ArbiterIAm; + } + if (_selfConfig().getPriority() <= 0) { + result |= NoPriority; + } + if (_stepDownUntil > now) { + result |= StepDownPeriodActive; + } + + // Cannot be electable unless secondary or already primary + if (!getMemberState().secondary() && !_iAmPrimary()) { + result |= NotSecondary; + } + + if (_rsConfig.getProtocolVersion() == 0) { + // Election rules only for protocol version 0. + if (_voteLease.whoId != -1 && + _voteLease.whoId != _rsConfig.getMemberAt(_selfIndex).getId() && + _voteLease.when + VoteLease::leaseTime >= now) { + result |= VotedTooRecently; + } + if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied)) { + result |= NotCloseEnoughToLatestOptime; + } + } else { + // Election rules only for protocol version 1. + invariant(_rsConfig.getProtocolVersion() == 1); + if (reason == StartElectionReason::kPriorityTakeover && + !_amIFreshEnoughForPriorityTakeover()) { + result |= NotCloseEnoughToLatestForPriorityTakeover; + } + + if (reason == StartElectionReason::kCatchupTakeover && + !_amIFreshEnoughForCatchupTakeover()) { + result |= NotFreshEnoughForCatchupTakeover; + } + } + return result; +} + +std::string TopologyCoordinatorImpl::_getUnelectableReasonString( + const UnelectableReasonMask ur) const { + invariant(ur); + str::stream ss; + bool hasWrittenToStream = false; + if (ur & NoData) { + ss << "node has no applied oplog entries"; + hasWrittenToStream = true; + } + if (ur & VotedTooRecently) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "I recently voted for " << _voteLease.whoHostAndPort.toString(); + } + if (ur & CannotSeeMajority) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "I cannot see a majority"; + } + if (ur & ArbiterIAm) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "member is an arbiter"; + } + if (ur & NoPriority) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "member has zero priority"; + } + if (ur & StepDownPeriodActive) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "I am still waiting for stepdown period to end at " + << dateToISOStringLocal(_stepDownUntil); + } + if (ur & NotSecondary) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "member is not currently a secondary"; + } + if (ur & NotCloseEnoughToLatestOptime) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "member is more than 10 seconds behind the most up-to-date member"; + } + if (ur & NotCloseEnoughToLatestForPriorityTakeover) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "member is not caught up enough to the most up-to-date member to call for priority " + "takeover - must be within " + << priorityTakeoverFreshnessWindowSeconds << " seconds"; + } + if (ur & NotFreshEnoughForCatchupTakeover) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "member is either not the most up-to-date member or not ahead of the primary, and " + "therefore cannot call for catchup takeover"; + } + if (ur & NotInitialized) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "node is not a member of a valid replica set configuration"; + } + if (ur & RefusesToStand) { + if (hasWrittenToStream) { + ss << "; "; + } + hasWrittenToStream = true; + ss << "most recent heartbeat indicates node will not stand for election"; + } + if (!hasWrittenToStream) { + severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur); + fassertFailed(26011); + } + ss << " (mask 0x" << integerToHex(ur) << ")"; + return ss; +} + +Milliseconds TopologyCoordinatorImpl::_getPing(const HostAndPort& host) { + return _pings[host].getMillis(); +} + +void TopologyCoordinatorImpl::_setElectionTime(const Timestamp& newElectionTime) { + _electionTime = newElectionTime; +} + +int TopologyCoordinatorImpl::_getTotalPings() { + PingMap::iterator it = _pings.begin(); + PingMap::iterator end = _pings.end(); + int totalPings = 0; + while (it != end) { + totalPings += it->second.getCount(); + it++; + } + return totalPings; +} + +std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const { + std::vector<HostAndPort> upHosts; + for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); + ++it) { + const int itIndex = indexOfIterator(_memberData, it); + if (itIndex == _selfIndex) { + continue; // skip ourselves + } + if (!it->maybeUp()) { + continue; // skip DOWN nodes + } -TopologyCoordinator::Role::Role(int value) : _value(value) {} + upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort()); + } + return upHosts; +} -std::string TopologyCoordinator::Role::toString() const { - switch (_value) { - case kLeaderValue: - return "leader"; - case kFollowerValue: - return "follower"; - case kCandidateValue: - return "candidate"; +bool TopologyCoordinatorImpl::voteForMyself(Date_t now) { + if (_role != Role::candidate) { + return false; } - invariant(false); + int selfId = _selfConfig().getId(); + if ((_voteLease.when + VoteLease::leaseTime >= now) && (_voteLease.whoId != selfId)) { + log() << "not voting yea for " << selfId << " voted for " + << _voteLease.whoHostAndPort.toString() << ' ' + << durationCount<Seconds>(now - _voteLease.when) << " secs ago"; + return false; + } + _voteLease.when = now; + _voteLease.whoId = selfId; + _voteLease.whoHostAndPort = _selfConfig().getHostAndPort(); + return true; } -TopologyCoordinator::~TopologyCoordinator() {} +bool TopologyCoordinatorImpl::isSteppingDown() const { + return _leaderMode == LeaderMode::kAttemptingStepDown || + _leaderMode == LeaderMode::kSteppingDown; +} -std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role) { - return os << role.toString(); +void TopologyCoordinatorImpl::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) { + // Invariants for valid state transitions. + switch (_leaderMode) { + case LeaderMode::kNotLeader: + invariant(newMode == LeaderMode::kLeaderElect); + break; + case LeaderMode::kLeaderElect: + invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case + newMode == LeaderMode::kMaster || + newMode == LeaderMode::kAttemptingStepDown || + newMode == LeaderMode::kSteppingDown); + break; + case LeaderMode::kMaster: + invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case + newMode == LeaderMode::kAttemptingStepDown || + newMode == LeaderMode::kSteppingDown); + break; + case LeaderMode::kAttemptingStepDown: + invariant(newMode == LeaderMode::kNotLeader || newMode == LeaderMode::kMaster || + newMode == LeaderMode::kSteppingDown); + break; + case LeaderMode::kSteppingDown: + invariant(newMode == LeaderMode::kNotLeader); + break; + } + _leaderMode = std::move(newMode); } -std::ostream& operator<<(std::ostream& os, - TopologyCoordinator::PrepareFreezeResponseResult result) { - switch (result) { - case TopologyCoordinator::PrepareFreezeResponseResult::kNoAction: - return os << "no action"; - case TopologyCoordinator::PrepareFreezeResponseResult::kElectSelf: - return os << "elect self"; +MemberState TopologyCoordinatorImpl::getMemberState() const { + if (_selfIndex == -1) { + if (_rsConfig.isInitialized()) { + return MemberState::RS_REMOVED; + } + return MemberState::RS_STARTUP; + } + + if (_rsConfig.isConfigServer()) { + if (_options.clusterRole != ClusterRole::ConfigServer) { + return MemberState::RS_REMOVED; + } else { + invariant(_storageEngineSupportsReadCommitted != ReadCommittedSupport::kUnknown); + if (_storageEngineSupportsReadCommitted == ReadCommittedSupport::kNo) { + return MemberState::RS_REMOVED; + } + } + } else { + if (_options.clusterRole == ClusterRole::ConfigServer) { + return MemberState::RS_REMOVED; + } + } + + if (_role == Role::leader) { + invariant(_currentPrimaryIndex == _selfIndex); + invariant(_leaderMode != LeaderMode::kNotLeader); + return MemberState::RS_PRIMARY; + } + const MemberConfig& myConfig = _selfConfig(); + if (myConfig.isArbiter()) { + return MemberState::RS_ARBITER; + } + if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_memberData, _selfIndex))) && + (_followerMode == MemberState::RS_SECONDARY)) { + return MemberState::RS_RECOVERING; + } + return _followerMode; +} + +bool TopologyCoordinatorImpl::canAcceptWrites() const { + return _leaderMode == LeaderMode::kMaster; +} + +void TopologyCoordinatorImpl::setElectionInfo(OID electionId, Timestamp electionOpTime) { + invariant(_role == Role::leader); + _electionTime = electionOpTime; + _electionId = electionId; +} + +void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp electionOpTime) { + invariant(_role == Role::candidate); + invariant(_leaderMode == LeaderMode::kNotLeader); + _role = Role::leader; + _setLeaderMode(LeaderMode::kLeaderElect); + setElectionInfo(electionId, electionOpTime); + _currentPrimaryIndex = _selfIndex; + _syncSource = HostAndPort(); + _forceSyncSourceIndex = -1; + // Prevent last committed optime from updating until we finish draining. + _firstOpTimeOfMyTerm = + OpTime(Timestamp(std::numeric_limits<int>::max(), 0), std::numeric_limits<int>::max()); +} + +void TopologyCoordinatorImpl::processLoseElection() { + invariant(_role == Role::candidate); + invariant(_leaderMode == LeaderMode::kNotLeader); + const HostAndPort syncSourceAddress = getSyncSourceAddress(); + _electionTime = Timestamp(0, 0); + _electionId = OID(); + _role = Role::follower; + + // Clear voteLease time, if we voted for ourselves in this election. + // This will allow us to vote for others. + if (_voteLease.whoId == _selfConfig().getId()) { + _voteLease.when = Date_t(); + } +} + +bool TopologyCoordinatorImpl::attemptStepDown( + long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) { + + if (_role != Role::leader || _leaderMode == LeaderMode::kSteppingDown || _term != termAtStart) { + uasserted(ErrorCodes::PrimarySteppedDown, + "While waiting for secondaries to catch up before stepping down, " + "this node decided to step down for other reasons"); + } + invariant(_leaderMode == LeaderMode::kAttemptingStepDown); + + if (now >= stepDownUntil) { + uasserted(ErrorCodes::ExceededTimeLimit, + "By the time we were ready to step down, we were already past the " + "time we were supposed to step down until"); + } + + if (!_canCompleteStepDownAttempt(now, waitUntil, force)) { + // Stepdown attempt failed. + + // Check waitUntil after at least one stepdown attempt, so that stepdown could succeed even + // if secondaryCatchUpPeriodSecs == 0. + if (now >= waitUntil) { + uasserted(ErrorCodes::ExceededTimeLimit, + str::stream() << "No electable secondaries caught up as of " + << dateToISOStringLocal(now) + << "Please use the replSetStepDown command with the argument " + << "{force: true} to force node to step down."); + } + + // Stepdown attempt failed, but in a way that can be retried + return false; + } + + // Stepdown attempt success! + _stepDownUntil = stepDownUntil; + _stepDownSelfAndReplaceWith(-1); + return true; +} + +bool TopologyCoordinatorImpl::_canCompleteStepDownAttempt(Date_t now, + Date_t waitUntil, + bool force) { + const bool forceNow = force && (now >= waitUntil); + if (forceNow) { + return true; + } + + return isSafeToStepDown(); +} + +bool TopologyCoordinatorImpl::isSafeToStepDown() { + if (!_rsConfig.isInitialized() || _selfIndex < 0) { + return false; + } + + OpTime lastApplied = getMyLastAppliedOpTime(); + + auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName); + invariant(tagStatus.isOK()); + + // Check if a majority of nodes have reached the last applied optime. + if (!haveTaggedNodesReachedOpTime(lastApplied, tagStatus.getValue(), false)) { + return false; + } + + // Now check that we also have at least one caught up node that is electable. + const OpTime lastOpApplied = getMyLastAppliedOpTime(); + for (int memberIndex = 0; memberIndex < _rsConfig.getNumMembers(); memberIndex++) { + // ignore your self + if (memberIndex == _selfIndex) { + continue; + } + UnelectableReasonMask reason = _getUnelectableReason(memberIndex); + if (!reason && _memberData.at(memberIndex).getHeartbeatAppliedOpTime() >= lastOpApplied) { + // Found a caught up and electable node, succeed with step down. + return true; + } + } + + return false; +} + +void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) { + invariant(_role == Role::follower); + switch (newMode) { + case MemberState::RS_RECOVERING: + case MemberState::RS_ROLLBACK: + case MemberState::RS_SECONDARY: + case MemberState::RS_STARTUP2: + _followerMode = newMode; + break; + default: + invariant(false); + } + + if (_followerMode != MemberState::RS_SECONDARY) { + return; + } + + // When a single node replica set transitions to SECONDARY, we must check if we should + // be a candidate here. This is necessary because a single node replica set has no + // heartbeats that would normally change the role to candidate. + + if (_isElectableNodeInSingleNodeReplicaSet()) { + _role = Role::candidate; + } +} + +bool TopologyCoordinatorImpl::_isElectableNodeInSingleNodeReplicaSet() const { + return _followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 && + _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable() && + _maintenanceModeCalls == 0; +} + +void TopologyCoordinatorImpl::finishUnconditionalStepDown() { + invariant(_leaderMode == LeaderMode::kSteppingDown); + + int remotePrimaryIndex = -1; + for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); + ++it) { + const int itIndex = indexOfIterator(_memberData, it); + if (itIndex == _selfIndex) { + continue; + } + + if (it->getState().primary() && it->up()) { + if (remotePrimaryIndex != -1) { + // two other nodes think they are primary (asynchronously polled) + // -- wait for things to settle down. + remotePrimaryIndex = -1; + warning() << "two remote primaries (transiently)"; + break; + } + remotePrimaryIndex = itIndex; + } + } + _stepDownSelfAndReplaceWith(remotePrimaryIndex); +} + +void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) { + invariant(_role == Role::leader); + invariant(_selfIndex != -1); + invariant(_selfIndex != newPrimary); + invariant(_selfIndex == _currentPrimaryIndex); + _currentPrimaryIndex = newPrimary; + _role = Role::follower; + _setLeaderMode(LeaderMode::kNotLeader); +} + +bool TopologyCoordinatorImpl::updateLastCommittedOpTime() { + // If we're not primary or we're stepping down due to learning of a new term then we must not + // advance the commit point. If we are stepping down due to a user request, however, then it + // is safe to advance the commit point, and in fact we must since the stepdown request may be + // waiting for the commit point to advance enough to be able to safely complete the step down. + if (!_iAmPrimary() || _leaderMode == LeaderMode::kSteppingDown) { + return false; + } + + // Whether we use the applied or durable OpTime for the commit point is decided here. + const bool useDurableOpTime = _rsConfig.getWriteConcernMajorityShouldJournal(); + + std::vector<OpTime> votingNodesOpTimes; + for (const auto& memberData : _memberData) { + int memberIndex = memberData.getConfigIndex(); + invariant(memberIndex >= 0); + const auto& memberConfig = _rsConfig.getMemberAt(memberIndex); + if (memberConfig.isVoter()) { + const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime() + : memberData.getLastAppliedOpTime(); + votingNodesOpTimes.push_back(opTime); + } + } + + invariant(votingNodesOpTimes.size() > 0); + if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) { + return false; + } + std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end()); + + // need the majority to have this OpTime + OpTime committedOpTime = + votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()]; + return advanceLastCommittedOpTime(committedOpTime); +} + +bool TopologyCoordinatorImpl::advanceLastCommittedOpTime(const OpTime& committedOpTime) { + if (committedOpTime == _lastCommittedOpTime) { + return false; // Hasn't changed, so ignore it. + } else if (committedOpTime < _lastCommittedOpTime) { + LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime + << ", currentCommittedOpTime: " << _lastCommittedOpTime; + return false; // This may have come from an out-of-order heartbeat. Ignore it. + } + + // This check is performed to ensure primaries do not commit an OpTime from a previous term. + if (_iAmPrimary() && committedOpTime < _firstOpTimeOfMyTerm) { + LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: " + << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm; + return false; + } + + LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime; + _lastCommittedOpTime = committedOpTime; + return true; +} + +OpTime TopologyCoordinatorImpl::getLastCommittedOpTime() const { + return _lastCommittedOpTime; +} + +bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary( + long long termWhenDrainCompleted) const { + + if (termWhenDrainCompleted != _term) { + return false; + } + // Allow completing the transition to primary even when in the middle of a stepdown attempt, + // in case the stepdown attempt fails. + if (_leaderMode != LeaderMode::kLeaderElect && _leaderMode != LeaderMode::kAttemptingStepDown) { + return false; + } + + return true; +} + +Status TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) { + if (!canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())) { + return Status(ErrorCodes::PrimarySteppedDown, + "By the time this node was ready to complete its transition to PRIMARY it " + "was no longer eligible to do so"); + } + if (_leaderMode == LeaderMode::kLeaderElect) { + _setLeaderMode(LeaderMode::kMaster); + } + _firstOpTimeOfMyTerm = firstOpTimeOfTerm; + return Status::OK(); +} + +void TopologyCoordinatorImpl::adjustMaintenanceCountBy(int inc) { + invariant(_role == Role::follower); + _maintenanceModeCalls += inc; + invariant(_maintenanceModeCalls >= 0); +} + +int TopologyCoordinatorImpl::getMaintenanceCount() const { + return _maintenanceModeCalls; +} + +TopologyCoordinator::UpdateTermResult TopologyCoordinatorImpl::updateTerm(long long term, + Date_t now) { + if (term <= _term) { + return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate; + } + // Don't run election if we just stood up or learned about a new term. + _electionSleepUntil = now + _rsConfig.getElectionTimeoutPeriod(); + + // Don't update the term just yet if we are going to step down, as we don't want to report + // that we are primary in the new term. + if (_iAmPrimary()) { + return TopologyCoordinator::UpdateTermResult::kTriggerStepDown; + } + LOG(1) << "Updating term from " << _term << " to " << term; + _term = term; + return TopologyCoordinator::UpdateTermResult::kUpdatedTerm; +} + + +long long TopologyCoordinatorImpl::getTerm() { + return _term; +} + +// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the +// replset. Passing metadata is unnecessary. +bool TopologyCoordinatorImpl::shouldChangeSyncSource( + const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata, + Date_t now) const { + // Methodology: + // If there exists a viable sync source member other than currentSource, whose oplog has + // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's, + // return true. + // If the currentSource has the same replication progress as we do and has no source for further + // progress, return true. + + if (_selfIndex == -1) { + log() << "Not choosing new sync source because we are not in the config."; + return false; + } + + // If the user requested a sync source change, return true. + if (_forceSyncSourceIndex != -1) { + log() << "Choosing new sync source because the user has requested to use " + << _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort() + << " as a sync source"; + return true; + } + + if (_rsConfig.getProtocolVersion() == 1 && + replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { + log() << "Choosing new sync source because the config version supplied by " << currentSource + << ", " << replMetadata.getConfigVersion() << ", does not match ours, " + << _rsConfig.getConfigVersion(); + return true; + } + + const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource); + // PV0 doesn't use metadata, we have to consult _rsConfig. + if (currentSourceIndex == -1) { + log() << "Choosing new sync source because " << currentSource.toString() + << " is not in our config"; + return true; + } + + invariant(currentSourceIndex != _selfIndex); + + // If OplogQueryMetadata was provided, use its values, otherwise use the ones in + // ReplSetMetadata. + OpTime currentSourceOpTime; + int syncSourceIndex = -1; + int primaryIndex = -1; + if (oqMetadata) { + currentSourceOpTime = + std::max(oqMetadata->getLastOpApplied(), + _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); + syncSourceIndex = oqMetadata->getSyncSourceIndex(); + primaryIndex = oqMetadata->getPrimaryIndex(); + } else { + currentSourceOpTime = + std::max(replMetadata.getLastOpVisible(), + _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); + syncSourceIndex = replMetadata.getSyncSourceIndex(); + primaryIndex = replMetadata.getPrimaryIndex(); + } + + if (currentSourceOpTime.isNull()) { + // Haven't received a heartbeat from the sync source yet, so can't tell if we should + // change. + return false; + } + + // Change sync source if they are not ahead of us, and don't have a sync source, + // unless they are primary. + const OpTime myLastOpTime = getMyLastAppliedOpTime(); + if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 && + currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) { + std::stringstream logMessage; + logMessage << "Choosing new sync source because our current sync source, " + << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime + << ") which is not ahead of ours (" << myLastOpTime + << "), it does not have a sync source, and it's not the primary"; + if (primaryIndex >= 0) { + logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)"; + } else { + logMessage << " (sync source does not know the primary)"; + } + log() << logMessage.str(); + return true; + } + + if (MONGO_FAIL_POINT(disableMaxSyncSourceLagSecs)) { + log() << "disableMaxSyncSourceLagSecs fail point enabled - not checking the most recent " + "OpTime, " + << currentSourceOpTime.toString() << ", of our current sync source, " << currentSource + << ", against the OpTimes of the other nodes in this replica set."; + } else { + unsigned int currentSecs = currentSourceOpTime.getSecs(); + unsigned int goalSecs = currentSecs + durationCount<Seconds>(_options.maxSyncSourceLagSecs); + + for (std::vector<MemberData>::const_iterator it = _memberData.begin(); + it != _memberData.end(); + ++it) { + const int itIndex = indexOfIterator(_memberData, it); + const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex); + if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) && + (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) && + it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) && + goalSecs < it->getHeartbeatAppliedOpTime().getSecs()) { + log() << "Choosing new sync source because the most recent OpTime of our sync " + "source, " + << currentSource << ", is " << currentSourceOpTime.toString() + << " which is more than " << _options.maxSyncSourceLagSecs + << " behind member " << candidateConfig.getHostAndPort().toString() + << " whose most recent OpTime is " + << it->getHeartbeatAppliedOpTime().toString(); + invariant(itIndex != _selfIndex); + return true; + } + } + } + + return false; +} + +rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata( + const OpTime& lastVisibleOpTime) const { + return rpc::ReplSetMetadata(_term, + _lastCommittedOpTime, + lastVisibleOpTime, + _rsConfig.getConfigVersion(), + _rsConfig.getReplicaSetId(), + _currentPrimaryIndex, + _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); +} + +rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata(int rbid) const { + return rpc::OplogQueryMetadata(_lastCommittedOpTime, + getMyLastAppliedOpTime(), + rbid, + _currentPrimaryIndex, + _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); +} + +void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { + output->setConfig(_rsConfig); + output->setHBData(_memberData); + output->setSelfIndex(_selfIndex); + output->setPrimaryIndex(_currentPrimaryIndex); + output->setSelfState(getMemberState()); + output->setSelfHeartbeatMessage(_hbmsg); +} + +void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response) { + response->setTerm(_term); + + if (args.getTerm() < _term) { + response->setVoteGranted(false); + response->setReason(str::stream() << "candidate's term (" << args.getTerm() + << ") is lower than mine (" + << _term + << ")"); + } else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) { + response->setVoteGranted(false); + response->setReason(str::stream() << "candidate's config version (" + << args.getConfigVersion() + << ") differs from mine (" + << _rsConfig.getConfigVersion() + << ")"); + } else if (args.getSetName() != _rsConfig.getReplSetName()) { + response->setVoteGranted(false); + response->setReason(str::stream() << "candidate's set name (" << args.getSetName() + << ") differs from mine (" + << _rsConfig.getReplSetName() + << ")"); + } else if (args.getLastDurableOpTime() < getMyLastAppliedOpTime()) { + response->setVoteGranted(false); + response + ->setReason(str::stream() + << "candidate's data is staler than mine. candidate's last applied OpTime: " + << args.getLastDurableOpTime().toString() + << ", my last applied OpTime: " + << getMyLastAppliedOpTime().toString()); + } else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) { + response->setVoteGranted(false); + response->setReason(str::stream() + << "already voted for another candidate (" + << _rsConfig.getMemberAt(_lastVote.getCandidateIndex()).getHostAndPort() + << ") this term (" + << _lastVote.getTerm() + << ")"); + } else { + int betterPrimary = _findHealthyPrimaryOfEqualOrGreaterPriority(args.getCandidateIndex()); + if (_selfConfig().isArbiter() && betterPrimary >= 0) { + response->setVoteGranted(false); + response->setReason(str::stream() + << "can see a healthy primary (" + << _rsConfig.getMemberAt(betterPrimary).getHostAndPort() + << ") of equal or greater priority"); + } else { + if (!args.isADryRun()) { + _lastVote.setTerm(args.getTerm()); + _lastVote.setCandidateIndex(args.getCandidateIndex()); + } + response->setVoteGranted(true); + } + } +} + +void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) { + _lastVote = lastVote; +} + +void TopologyCoordinatorImpl::voteForMyselfV1() { + _lastVote.setTerm(_term); + _lastVote.setCandidateIndex(_selfIndex); +} + +void TopologyCoordinatorImpl::setPrimaryIndex(long long primaryIndex) { + _currentPrimaryIndex = primaryIndex; +} + +Status TopologyCoordinatorImpl::becomeCandidateIfElectable(const Date_t now, + StartElectionReason reason) { + if (_role == Role::leader) { + return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"}; + } + + if (_role == Role::candidate) { + return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"}; + } + + const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, reason); + if (unelectableReason) { + return {ErrorCodes::NodeNotElectable, + str::stream() << "Not standing for election because " + << _getUnelectableReasonString(unelectableReason)}; + } + + // All checks passed, become a candidate and start election proceedings. + _role = Role::candidate; + + return Status::OK(); +} + +void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool supported) { + _storageEngineSupportsReadCommitted = + supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; +} + +void TopologyCoordinatorImpl::restartHeartbeats() { + for (auto& hb : _memberData) { + hb.restart(); + } +} + +boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const { + // The smallest OpTime in PV1. + OpTime latest(Timestamp(0, 0), 0); + for (size_t i = 0; i < _memberData.size(); i++) { + auto& peer = _memberData[i]; + + if (static_cast<int>(i) == _selfIndex) { + continue; + } + // If any heartbeat is not fresh enough, return none. + if (!peer.isUpdatedSinceRestart()) { + return boost::none; + } + // Ignore down members + if (!peer.up()) { + continue; + } + if (peer.getHeartbeatAppliedOpTime() > latest) { + latest = peer.getHeartbeatAppliedOpTime(); + } } - MONGO_UNREACHABLE; + return latest; } } // namespace repl diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index ed00fecd31f..046ff2217a6 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -28,737 +28,502 @@ #pragma once -#include <iosfwd> #include <string> - -#include "mongo/base/disallow_copying.h" -#include "mongo/db/repl/repl_set_heartbeat_response.h" +#include <vector> + +#include "mongo/bson/timestamp.h" +#include "mongo/db/repl/last_vote.h" +#include "mongo/db/repl/member_data.h" +#include "mongo/db/repl/member_state.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/stdx/functional.h" -#include "mongo/util/net/hostandport.h" +#include "mongo/db/repl/topology_coordinator.h" +#include "mongo/db/server_options.h" #include "mongo/util/time_support.h" namespace mongo { -class Timestamp; +class OperationContext; + +namespace rpc { +class ReplSetMetadata; +} // namespace rpc namespace repl { -class HeartbeatResponseAction; -class MemberData; -class OpTime; -class ReplSetHeartbeatArgs; -class ReplSetConfig; -class TagSubgroup; -class LastVote; -struct MemberState; +static const Milliseconds UninitializedPing{-1}; /** - * Replication Topology Coordinator interface. + * Represents a latency measurement for each replica set member based on heartbeat requests. + * The measurement is an average weighted 80% to the old value, and 20% to the new value. * - * This object is responsible for managing the topology of the cluster. - * Tasks include consensus and leader election, chaining, and configuration management. - * Methods of this class should be non-blocking. + * Also stores information about heartbeat progress and retries. */ -class TopologyCoordinator { - MONGO_DISALLOW_COPYING(TopologyCoordinator); - +class PingStats { public: - class Role; - - virtual ~TopologyCoordinator(); - /** - * Different modes a node can be in while still reporting itself as in state PRIMARY. - * - * Valid transitions: - * - * kNotLeader <---------------------------------- - * | | - * | | - * | | - * v | - * kLeaderElect----- | - * | | | - * | | | - * v | | - * kMaster ------------------------- | - * | ^ | | | - * | | ------------------- | | - * | | | | | | - * v | v v v | - * kAttemptingStepDown----------->kSteppingDown | - * | | | - * | | | - * | | | - * --------------------------------------------- + * Records that a new heartbeat request started at "now". * + * This resets the failure count used in determining whether the next request to a target + * should be a retry or a regularly scheduled heartbeat message. */ - enum class LeaderMode { - kNotLeader, // This node is not currently a leader. - kLeaderElect, // This node has been elected leader, but can't yet accept writes. - kMaster, // This node reports ismaster:true and can accept writes. - kSteppingDown, // This node is in the middle of a (hb) stepdown that must complete. - kAttemptingStepDown, // This node is in the middle of a stepdown (cmd) that might fail. - }; - - //////////////////////////////////////////////////////////// - // - // State inspection methods. - // - //////////////////////////////////////////////////////////// + void start(Date_t now); /** - * Gets the role of this member in the replication protocol. + * Records that a heartbeat request completed successfully, and that "millis" milliseconds + * were spent for a single network roundtrip plus remote processing time. */ - virtual Role getRole() const = 0; + void hit(Milliseconds millis); /** - * Gets the MemberState of this member in the replica set. + * Records that a heartbeat request failed. */ - virtual MemberState getMemberState() const = 0; + void miss(); /** - * Returns whether this node should be allowed to accept writes. + * Gets the number of hit() calls. */ - virtual bool canAcceptWrites() const = 0; + unsigned int getCount() const { + return count; + } /** - * Returns true if this node is in the process of stepping down. Note that this can be - * due to an unconditional stepdown that must succeed (for instance from learning about a new - * term) or due to a stepdown attempt that could fail (for instance from a stepdown cmd that - * could fail if not enough nodes are caught up). + * Gets the weighted average round trip time for heartbeat messages to the target. + * Returns 0 if there have been no pings recorded yet. */ - virtual bool isSteppingDown() const = 0; + Milliseconds getMillis() const { + return value == UninitializedPing ? Milliseconds(0) : value; + } /** - * Returns the address of the current sync source, or an empty HostAndPort if there is no - * current sync source. + * Gets the date at which start() was last called, which is used to determine if + * a heartbeat should be retried or if the time limit has expired. */ - virtual HostAndPort getSyncSourceAddress() const = 0; + Date_t getLastHeartbeatStartDate() const { + return _lastHeartbeatStartDate; + } /** - * Retrieves a vector of HostAndPorts containing all nodes that are neither DOWN nor - * ourself. + * Gets the number of failures since start() was last called. + * + * This value is incremented by calls to miss(), cleared by calls to start() and + * set to the maximum possible value by calls to hit(). */ - virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const = 0; + int getNumFailuresSinceLastStart() const { + return _numFailuresSinceLastStart; + } - /** - * Gets the earliest time the current node will stand for election. - */ - virtual Date_t getStepDownTime() const = 0; +private: + unsigned int count = 0; + Milliseconds value = UninitializedPing; + Date_t _lastHeartbeatStartDate; + int _numFailuresSinceLastStart = std::numeric_limits<int>::max(); +}; - /** - * Gets the current value of the maintenance mode counter. - */ - virtual int getMaintenanceCount() const = 0; +class TopologyCoordinatorImpl : public TopologyCoordinator { +public: + struct Options { + // A sync source is re-evaluated after it lags behind further than this amount. + Seconds maxSyncSourceLagSecs{0}; - /** - * Gets the latest term this member is aware of. If this member is the primary, - * it's the current term of the replica set. - */ - virtual long long getTerm() = 0; + // Whether or not this node is running as a config server. + ClusterRole clusterRole{ClusterRole::None}; + }; - enum class UpdateTermResult { kAlreadyUpToDate, kTriggerStepDown, kUpdatedTerm }; + /** + * Constructs a Topology Coordinator object. + **/ + TopologyCoordinatorImpl(Options options); //////////////////////////////////////////////////////////// // - // Basic state manipulation methods. + // Implementation of TopologyCoordinator interface // //////////////////////////////////////////////////////////// - /** - * Sets the latest term this member is aware of to the higher of its current value and - * the value passed in as "term". - * Returns the result of setting the term value, or if a stepdown should be triggered. - */ - virtual UpdateTermResult updateTerm(long long term, Date_t now) = 0; - - /** - * Sets the index into the config used when we next choose a sync source - */ - virtual void setForceSyncSourceIndex(int index) = 0; - - enum class ChainingPreference { kAllowChaining, kUseConfiguration }; - - /** - * Chooses and sets a new sync source, based on our current knowledge of the world. - */ + virtual Role getRole() const; + virtual MemberState getMemberState() const; + virtual bool canAcceptWrites() const override; + virtual bool isSteppingDown() const override; + virtual HostAndPort getSyncSourceAddress() const; + virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const; + virtual int getMaintenanceCount() const; + virtual long long getTerm(); + virtual UpdateTermResult updateTerm(long long term, Date_t now); + virtual void setForceSyncSourceIndex(int index); virtual HostAndPort chooseNewSyncSource(Date_t now, const OpTime& lastOpTimeFetched, - ChainingPreference chainingPreference) = 0; - - /** - * Suppresses selecting "host" as sync source until "until". - */ - virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0; - - /** - * Removes a single entry "host" from the list of potential sync sources which we - * have blacklisted, if it is supposed to be unblacklisted by "now". - */ - virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now) = 0; - - /** - * Clears the list of potential sync sources we have blacklisted. - */ - virtual void clearSyncSourceBlacklist() = 0; - - /** - * Determines if a new sync source should be chosen, if a better candidate sync source is - * available. If the current sync source's last optime ("syncSourceLastOpTime" under - * protocolVersion 1, but pulled from the MemberData in protocolVersion 0) is more than - * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are - * running in ProtocolVersion 1, our current sync source is not primary, has no sync source - * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true. - * - * "now" is used to skip over currently blacklisted sync sources. - * - * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. - */ + ChainingPreference chainingPreference) override; + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); + virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now); + virtual void clearSyncSourceBlacklist(); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata, - Date_t now) const = 0; - - /** - * Checks whether we are a single node set and we are not in a stepdown period. If so, - * puts us into candidate mode, otherwise does nothing. This is used to ensure that - * nodes in a single node replset become primary again when their stepdown period ends. - */ - virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) = 0; - - /** - * Sets the earliest time the current node will stand for election to "newTime". - * - * Until this time, while the node may report itself as electable, it will not stand - * for election. - */ - virtual void setElectionSleepUntil(Date_t newTime) = 0; - - /** - * Sets the reported mode of this node to one of RS_SECONDARY, RS_STARTUP2, RS_ROLLBACK or - * RS_RECOVERING, when getRole() == Role::follower. This is the interface by which the - * applier changes the reported member state of the current node, and enables or suppresses - * electability of the current node. All modes but RS_SECONDARY indicate an unelectable - * follower state (one that cannot transition to candidate). - */ - virtual void setFollowerMode(MemberState::MS newMode) = 0; - - /** - * Scan the memberData and determine the highest last applied or last - * durable optime present on a majority of servers; set _lastCommittedOpTime to this - * new entry. - * Whether the last applied or last durable op time is used depends on whether - * the config getWriteConcernMajorityShouldJournal is set. - * Returns true if the _lastCommittedOpTime was changed. - */ - virtual bool updateLastCommittedOpTime() = 0; - - /** - * Updates _lastCommittedOpTime to be "committedOpTime" if it is more recent than the - * current last committed OpTime. Returns true if _lastCommittedOpTime is changed. - */ - virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime) = 0; - - /** - * Returns the OpTime of the latest majority-committed op known to this server. - */ - virtual OpTime getLastCommittedOpTime() const = 0; - - /** - * Returns true if it's safe to transition to LeaderMode::kMaster. - */ - virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const = 0; - - /** - * Called by the ReplicationCoordinator to signal that we have finished catchup and drain modes - * and are ready to fully become primary and start accepting writes. - * "firstOpTimeOfTerm" is a floor on the OpTimes this node will be allowed to consider committed - * for this tenure as primary. This prevents entries from before our election from counting as - * committed in our view, until our election (the "firstOpTimeOfTerm" op) has been committed. - * Returns PrimarySteppedDown if this node is no longer eligible to begin accepting writes. - */ - virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) = 0; - - /** - * Adjusts the maintenance mode count by "inc". - * - * It is an error to call this method if getRole() does not return Role::follower. - * It is an error to allow the maintenance count to go negative. - */ - virtual void adjustMaintenanceCountBy(int inc) = 0; - - //////////////////////////////////////////////////////////// - // - // Methods that prepare responses to command requests. - // - //////////////////////////////////////////////////////////// - - // produces a reply to a replSetSyncFrom command + Date_t now) const; + virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); + virtual void setElectionSleepUntil(Date_t newTime); + virtual void setFollowerMode(MemberState::MS newMode); + virtual bool updateLastCommittedOpTime(); + virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime); + virtual OpTime getLastCommittedOpTime() const; + virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override; + virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) override; + virtual void adjustMaintenanceCountBy(int inc); virtual void prepareSyncFromResponse(const HostAndPort& target, BSONObjBuilder* response, - Status* result) = 0; - - // produce a reply to a replSetFresh command + Status* result); virtual void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args, Date_t now, BSONObjBuilder* response, - Status* result) = 0; - - // produce a reply to a received electCmd + Status* result); virtual void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args, Date_t now, BSONObjBuilder* response, - Status* result) = 0; - - // produce a reply to a heartbeat + Status* result); virtual Status prepareHeartbeatResponse(Date_t now, const ReplSetHeartbeatArgs& args, const std::string& ourSetName, - ReplSetHeartbeatResponse* response) = 0; - - // produce a reply to a V1 heartbeat + ReplSetHeartbeatResponse* response); virtual Status prepareHeartbeatResponseV1(Date_t now, const ReplSetHeartbeatArgsV1& args, const std::string& ourSetName, - ReplSetHeartbeatResponse* response) = 0; - - struct ReplSetStatusArgs { - Date_t now; - unsigned selfUptime; - const OpTime& readConcernMajorityOpTime; - const BSONObj& initialSyncStatus; - }; - - // produce a reply to a status request + ReplSetHeartbeatResponse* response); virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, BSONObjBuilder* response, - Status* result) = 0; - - // Produce a replSetUpdatePosition command to be sent to the node's sync source. + Status* result); virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand( ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle, - OpTime currentCommittedSnapshotOpTime) const = 0; - - // produce a reply to an ismaster request. It is only valid to call this if we are a - // replset. - virtual void fillIsMasterForReplSet(IsMasterResponse* response) = 0; - - // Produce member data for the serverStatus command and diagnostic logging. - virtual void fillMemberData(BSONObjBuilder* result) = 0; - - enum class PrepareFreezeResponseResult { kNoAction, kElectSelf }; - - /** - * Produce a reply to a freeze request. Returns a PostMemberStateUpdateAction on success that - * may trigger state changes in the caller. - */ - virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse( - Date_t now, int secs, BSONObjBuilder* response) = 0; - - //////////////////////////////////////////////////////////// - // - // Methods for sending and receiving heartbeats, - // reconfiguring and handling the results of standing for - // election. - // - //////////////////////////////////////////////////////////// - - /** - * Updates the topology coordinator's notion of the replica set configuration. - * - * "newConfig" is the new configuration, and "selfIndex" is the index of this - * node's configuration information in "newConfig", or "selfIndex" is -1 to - * indicate that this node is not a member of "newConfig". - * - * newConfig.isInitialized() should be true, though implementations may accept - * configurations where this is not true, for testing purposes. - */ - virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now) = 0; - - /** - * Prepares a heartbeat request appropriate for sending to "target", assuming the - * current time is "now". "ourSetName" is used as the name for our replica set if - * the topology coordinator does not have a valid configuration installed. - * - * The returned pair contains proper arguments for a replSetHeartbeat command, and - * an amount of time to wait for the response. - * - * This call should be paired (with intervening network communication) with a call to - * processHeartbeatResponse for the same "target". - */ + OpTime currentCommittedSnapshotOpTime) const; + + virtual void fillIsMasterForReplSet(IsMasterResponse* response); + virtual void fillMemberData(BSONObjBuilder* result); + virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now, + int secs, + BSONObjBuilder* response); + virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now); virtual std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest( - Date_t now, const std::string& ourSetName, const HostAndPort& target) = 0; + Date_t now, const std::string& ourSetName, const HostAndPort& target); virtual std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1( - Date_t now, const std::string& ourSetName, const HostAndPort& target) = 0; - - /** - * Processes a heartbeat response from "target" that arrived around "now", having - * spent "networkRoundTripTime" millis on the network. - * - * Updates internal topology coordinator state, and returns instructions about what action - * to take next. - * - * If the next action indicates StartElection, the topology coordinator has transitioned to - * the "candidate" role, and will remain there until processWinElection or - * processLoseElection are called. - * - * If the next action indicates "StepDownSelf", the topology coordinator has transitioned - * to the "follower" role from "leader", and the caller should take any necessary actions - * to become a follower. - * - * If the next action indicates "StepDownRemotePrimary", the caller should take steps to - * cause the specified remote host to step down from primary to secondary. - * - * If the next action indicates "Reconfig", the caller should verify the configuration in - * hbResponse is acceptable, perform any other reconfiguration actions it must, and call - * updateConfig with the new configuration and the appropriate value for "selfIndex". It - * must also wrap up any outstanding elections (by calling processLoseElection or - * processWinElection) before calling updateConfig. - * - * This call should be paired (with intervening network communication) with a call to - * prepareHeartbeatRequest for the same "target". - */ + Date_t now, const std::string& ourSetName, const HostAndPort& target); virtual HeartbeatResponseAction processHeartbeatResponse( Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, - const StatusWith<ReplSetHeartbeatResponse>& hbResponse) = 0; - - /** - * Returns whether or not at least 'numNodes' have reached the given opTime. - * "durablyWritten" indicates whether the operation has to be durably applied. - */ - virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, - int numNodes, - bool durablyWritten) = 0; - - /** - * Returns whether or not at least one node matching the tagPattern has reached - * the given opTime. - * "durablyWritten" indicates whether the operation has to be durably applied. - */ + const StatusWith<ReplSetHeartbeatResponse>& hbResponse); + virtual bool voteForMyself(Date_t now); + virtual void setElectionInfo(OID electionId, Timestamp electionOpTime); + virtual void processWinElection(OID electionId, Timestamp electionOpTime); + virtual void processLoseElection(); + virtual Status checkShouldStandForElection(Date_t now) const; + virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message); + virtual bool attemptStepDown(long long termAtStart, + Date_t now, + Date_t waitUntil, + Date_t stepDownUntil, + bool force) override; + virtual bool isSafeToStepDown() override; + virtual bool prepareForUnconditionalStepDown() override; + virtual Status prepareForStepDownAttempt() override; + virtual void abortAttemptedStepDownIfNeeded() override; + virtual void finishUnconditionalStepDown() override; + virtual Date_t getStepDownTime() const; + virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const; + virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const; + virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response); + virtual void summarizeAsHtml(ReplSetHtmlSummary* output); + virtual void loadLastVote(const LastVote& lastVote); + virtual void voteForMyselfV1(); + virtual void setPrimaryIndex(long long primaryIndex); + virtual int getCurrentPrimaryIndex() const; + virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten); virtual bool haveTaggedNodesReachedOpTime(const OpTime& opTime, const ReplSetTagPattern& tagPattern, - bool durablyWritten) = 0; - - /** - * Returns a vector of members that have applied the operation with OpTime 'op'. - * "durablyWritten" indicates whether the operation has to be durably applied. - * "skipSelf" means to exclude this node whether or not the op has been applied. - */ + bool durablyWritten); virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, bool durablyWritten, - bool skipSelf) = 0; + bool skipSelf); + virtual bool setMemberAsDown(Date_t now, const int memberIndex) override; + virtual std::pair<int, Date_t> getStalestLiveMember() const; + virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now); + virtual void resetAllMemberTimeouts(Date_t now); + virtual void resetMemberTimeouts(Date_t now, + const stdx::unordered_set<HostAndPort>& member_set); + virtual OpTime getMyLastAppliedOpTime() const; + virtual OpTime getMyLastDurableOpTime() const; + virtual MemberData* getMyMemberData(); + virtual MemberData* findMemberDataByMemberId(const int memberId); + virtual MemberData* findMemberDataByRid(const OID rid); + virtual MemberData* addSlaveMemberData(const OID rid); + virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason); + virtual void setStorageEngineSupportsReadCommitted(bool supported); - /** - * Marks a member as down from our perspective and returns a bool which indicates if we can no - * longer see a majority of the nodes and thus should step down. - */ - virtual bool setMemberAsDown(Date_t now, const int memberIndex) = 0; + virtual void restartHeartbeats(); - /** - * Goes through the memberData and determines which member that is currently live - * has the stalest (earliest) last update time. Returns (-1, Date_t::max()) if there are - * no other members. - */ - virtual std::pair<int, Date_t> getStalestLiveMember() const = 0; + virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const; - /** - * Go through the memberData, and mark nodes which haven't been updated - * recently (within an election timeout) as "down". Returns a HeartbeatResponseAction, which - * will be StepDownSelf if we can no longer see a majority of the nodes, otherwise NoAction. - */ - virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now) = 0; + //////////////////////////////////////////////////////////// + // + // Test support methods + // + //////////////////////////////////////////////////////////// - /** - * Set all nodes in memberData to not stale with a lastUpdate of "now". - */ - virtual void resetAllMemberTimeouts(Date_t now) = 0; + // Changes _memberState to newMemberState. Only for testing. + void changeMemberState_forTest(const MemberState& newMemberState, + const Timestamp& electionTime = Timestamp(0, 0)); - /** - * Set all nodes in memberData that are present in member_set - * to not stale with a lastUpdate of "now". - */ - virtual void resetMemberTimeouts(Date_t now, - const stdx::unordered_set<HostAndPort>& member_set) = 0; + // Sets "_electionTime" to "newElectionTime". Only for testing. + void _setElectionTime(const Timestamp& newElectionTime); - /* - * Returns the last optime that this node has applied, whether or not it has been journaled. - */ - virtual OpTime getMyLastAppliedOpTime() const = 0; + // Sets _currentPrimaryIndex to the given index. Should only be used in unit tests! + // TODO(spencer): Remove this once we can easily call for an election in unit tests to + // set the current primary. + void _setCurrentPrimaryForTest(int primaryIndex); - /* - * Returns the last optime that this node has applied and journaled. - */ - virtual OpTime getMyLastDurableOpTime() const = 0; + // Returns _electionTime. Only used in unittests. + Timestamp getElectionTime() const; - /* - * Returns information we have on the state of this node. - */ - virtual MemberData* getMyMemberData() = 0; + // Returns _electionId. Only used in unittests. + OID getElectionId() const; - /* - * Returns information we have on the state of the node identified by memberId. Returns - * nullptr if memberId is not found in the configuration. - */ - virtual MemberData* findMemberDataByMemberId(const int memberId) = 0; +private: + enum UnelectableReason { + None = 0, + CannotSeeMajority = 1 << 0, + NotCloseEnoughToLatestOptime = 1 << 1, + ArbiterIAm = 1 << 2, + NotSecondary = 1 << 3, + NoPriority = 1 << 4, + StepDownPeriodActive = 1 << 5, + NoData = 1 << 6, + NotInitialized = 1 << 7, + VotedTooRecently = 1 << 8, + RefusesToStand = 1 << 9, + NotCloseEnoughToLatestForPriorityTakeover = 1 << 10, + NotFreshEnoughForCatchupTakeover = 1 << 11, + }; + typedef int UnelectableReasonMask; - /* - * Returns information we have on the state of the node identified by rid. Returns - * nullptr if rid is not found in the heartbeat data. This method is used only for - * master/slave replication. - */ - virtual MemberData* findMemberDataByRid(const OID rid) = 0; - /* - * Adds and returns a memberData entry for the given RID. - * Used only in master/slave mode. - */ - virtual MemberData* addSlaveMemberData(const OID rid) = 0; + // Set what type of PRIMARY this node currently is. + void _setLeaderMode(LeaderMode mode); - /** - * If getRole() == Role::candidate and this node has not voted too recently, updates the - * lastVote tracker and returns true. Otherwise, returns false. - */ - virtual bool voteForMyself(Date_t now) = 0; + // Returns the number of heartbeat pings which have occurred. + int _getTotalPings(); - /** - * Sets lastVote to be for ourself in this term. - */ - virtual void voteForMyselfV1() = 0; + // Returns the current "ping" value for the given member by their address + Milliseconds _getPing(const HostAndPort& host); - /** - * Sets election id and election optime. - */ - virtual void setElectionInfo(OID electionId, Timestamp electionOpTime) = 0; + // Determines if we will veto the member specified by "args.id". + // If we veto, the errmsg will be filled in with a reason + bool _shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args, + const Date_t& now, + std::string* errmsg) const; - /** - * Performs state updates associated with winning an election. - * - * It is an error to call this if the topology coordinator is not in candidate mode. - * - * Exactly one of either processWinElection or processLoseElection must be called if - * processHeartbeatResponse returns StartElection, to exit candidate mode. - */ - virtual void processWinElection(OID electionId, Timestamp electionOpTime) = 0; + // Returns the index of the member with the matching id, or -1 if none match. + int _getMemberIndex(int id) const; - /** - * Performs state updates associated with losing an election. - * - * It is an error to call this if the topology coordinator is not in candidate mode. - * - * Exactly one of either processWinElection or processLoseElection must be called if - * processHeartbeatResponse returns StartElection, to exit candidate mode. - */ - virtual void processLoseElection() = 0; + // Sees if a majority number of votes are held by members who are currently "up" + bool _aMajoritySeemsToBeUp() const; - /** - * Readies the TopologyCoordinator for an attempt to stepdown that may fail. This is used - * when we receive a stepdown command (which can fail if not enough secondaries are caught up) - * to ensure that we never process more than one stepdown request at a time. - * Returns OK if it is safe to continue with the stepdown attempt, or returns - * ConflictingOperationInProgess if this node is already processing a stepdown request of any - * kind. - */ - virtual Status prepareForStepDownAttempt() = 0; + // Checks if the node can see a healthy primary of equal or greater priority to the + // candidate. If so, returns the index of that node. Otherwise returns -1. + int _findHealthyPrimaryOfEqualOrGreaterPriority(const int candidateIndex) const; - /** - * If this node is still attempting to process a stepdown attempt, aborts the attempt and - * returns this node to normal primary/master state. If this node has already completed - * stepping down or is now in the process of handling an unconditional stepdown, then this - * method does nothing. - */ - virtual void abortAttemptedStepDownIfNeeded() = 0; + // Is otherOpTime close enough (within 10 seconds) to the latest known optime to qualify + // for an election + bool _isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const; - /** - * Tries to transition the coordinator from the leader role to the follower role. - * - * A step down succeeds based on the following conditions: - * - * C1. 'force' is true and now > waitUntil - * - * C2. A majority set of nodes, M, in the replica set have optimes greater than or - * equal to the last applied optime of the primary. - * - * C3. There exists at least one electable secondary node in the majority set M. - * - * - * If C1 is true, or if both C2 and C3 are true, then the stepdown occurs and this method - * returns true. If the conditions for successful stepdown aren't met yet, but waiting for more - * time to pass could make it succeed, returns false. If the whole stepdown attempt should be - * abandoned (for example because the time limit expired or because we've already stepped down), - * throws an exception. - * TODO(spencer): Unify with the finishUnconditionalStepDown() method. - */ - virtual bool attemptStepDown( - long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) = 0; + // Is our optime close enough to the latest known optime to call for a priority takeover. + bool _amIFreshEnoughForPriorityTakeover() const; - /** - * Returns whether it is safe for a stepdown attempt to complete, ignoring the 'force' argument. - * This is essentially checking conditions C2 and C3 as described in the comment to - * attemptStepDown(). - */ - virtual bool isSafeToStepDown() = 0; + // Is the primary node still in catchup mode and is our optime the latest + // known optime of all the up nodes. + bool _amIFreshEnoughForCatchupTakeover() const; - /** - * Readies the TopologyCoordinator for stepdown. Returns false if we're already in the process - * of an unconditional step down. If we are in the middle of a stepdown command attempt when - * this is called then this unconditional stepdown will supersede the stepdown attempt, which - * will cause the stepdown to fail. When this returns true it must be followed by a call to - * finishUnconditionalStepDown() that is called when holding the global X lock. - */ - virtual bool prepareForUnconditionalStepDown() = 0; + // Returns reason why "self" member is unelectable + UnelectableReasonMask _getMyUnelectableReason(const Date_t now, + StartElectionReason reason) const; - /** - * Sometimes a request to step down comes in (like via a heartbeat), but we don't have the - * global exclusive lock so we can't actually stepdown at that moment. When that happens - * we record that a stepdown request is pending (by calling prepareForUnconditionalStepDown()) - * and schedule work to stepdown in the global X lock. This method is called after holding the - * global lock to perform the actual stepdown. - * TODO(spencer): Unify with the finishAttemptedStepDown() method. - */ - virtual void finishUnconditionalStepDown() = 0; + // Returns reason why memberIndex is unelectable + UnelectableReasonMask _getUnelectableReason(int memberIndex) const; - /** - * Considers whether or not this node should stand for election, and returns true - * if the node has transitioned to candidate role as a result of the call. - */ - virtual Status checkShouldStandForElection(Date_t now) const = 0; + // Returns the nice text of why the node is unelectable + std::string _getUnelectableReasonString(UnelectableReasonMask ur) const; - /** - * Set the outgoing heartbeat message from self - */ - virtual void setMyHeartbeatMessage(const Date_t now, const std::string& s) = 0; + // Return true if we are currently primary + bool _iAmPrimary() const; - /** - * Prepares a ReplSetMetadata object describing the current term, primary, and lastOp - * information. - */ - virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const = 0; + // Scans through all members that are 'up' and return the latest known optime. + OpTime _latestKnownOpTime() const; - /** - * Prepares an OplogQueryMetadata object describing the current sync source, rbid, primary, - * lastOpApplied, and lastOpCommitted. - */ - virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const = 0; + // Scans the electable set and returns the highest priority member index + int _getHighestPriorityElectableIndex(Date_t now) const; - /** - * Writes into 'output' all the information needed to generate a summary of the current - * replication state for use by the web interface. - */ - virtual void summarizeAsHtml(ReplSetHtmlSummary* output) = 0; + // Returns true if "one" member is higher priority than "two" member + bool _isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const; - /** - * Prepares a ReplSetRequestVotesResponse. - */ - virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response) = 0; + // Helper shortcut to self config + const MemberConfig& _selfConfig() const; - /** - * Loads an initial LastVote document, which was read from local storage. - * - * Called only during replication startup. All other updates are done internally. - */ - virtual void loadLastVote(const LastVote& lastVote) = 0; + // Helper shortcut to self member data + const MemberData& _selfMemberData() const; - /** - * Updates the current primary index. - */ - virtual void setPrimaryIndex(long long primaryIndex) = 0; + // Index of self member in member data. + const int _selfMemberDataIndex() const; - /** - * Returns the current primary index. - */ - virtual int getCurrentPrimaryIndex() const = 0; - - enum StartElectionReason { - kElectionTimeout, - kPriorityTakeover, - kStepUpRequest, - kCatchupTakeover - }; + // Returns NULL if there is no primary, or the MemberConfig* for the current primary + const MemberConfig* _currentPrimaryMember() const; /** - * Transitions to the candidate role if the node is electable. + * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an + * election or stepdown should commence. + * _updatePrimaryFromHBDataV1() is a simplified version of _updatePrimaryFromHBData() to be used + * when in ProtocolVersion1. */ - virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason) = 0; + HeartbeatResponseAction _updatePrimaryFromHBData(int updatedConfigIndex, + const MemberState& originalState, + Date_t now); + HeartbeatResponseAction _updatePrimaryFromHBDataV1(int updatedConfigIndex, + const MemberState& originalState, + Date_t now); /** - * Updates the storage engine read committed support in the TopologyCoordinator options after - * creation. + * Updates _memberData based on the newConfig, ensuring that every member in the newConfig + * has an entry in _memberData. If any nodes in the newConfig are also present in + * _currentConfig, copies their heartbeat info into the corresponding entry in the updated + * _memberData vector. */ - virtual void setStorageEngineSupportsReadCommitted(bool supported) = 0; + void _updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now); /** - * Reset the booleans to record the last heartbeat restart. + * Returns whether a stepdown attempt should be allowed to proceed. See the comment for + * attemptStepDown() for more details on the rules of when stepdown attempts succeed or fail. */ - virtual void restartHeartbeats() = 0; + bool _canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force); - /** - * Scans through all members that are 'up' and return the latest known optime, if we have - * received (successful or failed) heartbeats from all nodes since heartbeat restart. - * - * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted - * heartbeats. - * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down. - */ - virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const = 0; + void _stepDownSelfAndReplaceWith(int newPrimary); -protected: - TopologyCoordinator() {} -}; - -/** - * Type that denotes the role of a node in the replication protocol. - * - * The role is distinct from MemberState, in that it only deals with the - * roles a node plays in the basic protocol -- leader, follower and candidate. - * The mapping between MemberState and Role is complex -- several MemberStates - * map to the follower role, and MemberState::RS_SECONDARY maps to either - * follower or candidate roles, e.g. - */ -class TopologyCoordinator::Role { -public: /** - * Constant indicating leader role. - */ - static const Role leader; + * Looks up the provided member in the blacklist and returns true if the member's blacklist + * expire time is after 'now'. If the member is found but the expire time is before 'now', + * the function returns false. If the member is not found in the blacklist, the function + * returns false. + **/ + bool _memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const; /** - * Constant indicating follower role. - */ - static const Role follower; - - /** - * Constant indicating candidate role - */ - static const Role candidate; - - Role() {} - - bool operator==(Role other) const { - return _value == other._value; - } - bool operator!=(Role other) const { - return _value != other._value; - } - - std::string toString() const; - -private: - explicit Role(int value); + * Returns true if we are a one-node replica set, we're the one member, + * we're electable, we're not in maintenance mode, and we are currently in followerMode + * SECONDARY. + * + * This is used to decide if we should transition to Role::candidate in a one-node replica set. + */ + bool _isElectableNodeInSingleNodeReplicaSet() const; + + // This node's role in the replication protocol. + Role _role; + + // This is a unique id that is generated and set each time we transition to PRIMARY, as the + // result of an election. + OID _electionId; + // The time at which the current PRIMARY was elected. + Timestamp _electionTime; + + // This node's election term. The term is used as part of the consensus algorithm to elect + // and maintain one primary (leader) node in the cluster. + long long _term; + + // the index of the member we currently believe is primary, if one exists, otherwise -1 + int _currentPrimaryIndex; + + // the hostandport we are currently syncing from + // empty if no sync source (we are primary, or we cannot connect to anyone yet) + HostAndPort _syncSource; + // These members are not chosen as sync sources for a period of time, due to connection + // issues with them + std::map<HostAndPort, Date_t> _syncSourceBlacklist; + // The next sync source to be chosen, requested via a replSetSyncFrom command + int _forceSyncSourceIndex; + + // Options for this TopologyCoordinator + Options _options; + + // "heartbeat message" + // sent in requestHeartbeat respond in field "hbm" + std::string _hbmsg; + Date_t _hbmsgTime; // when it was logged + + // heartbeat msg to send to others; descriptive diagnostic info + std::string _getHbmsg(Date_t now) const; + + int _selfIndex; // this node's index in _members and _currentConfig + + ReplSetConfig _rsConfig; // The current config, including a vector of MemberConfigs + + // Heartbeat, current applied/durable optime, and other state data for each member. It is + // guaranteed that this vector will be maintained in the same order as the MemberConfigs in + // _currentConfig, therefore the member config index can be used to index into this vector as + // well. + std::vector<MemberData> _memberData; + + // Time when stepDown command expires + Date_t _stepDownUntil; + + // A time before which this node will not stand for election. + // In protocol version 1, this is used to prevent running for election after seeing + // a new term. + Date_t _electionSleepUntil; + + // OpTime of the latest committed operation. + OpTime _lastCommittedOpTime; + + // OpTime representing our transition to PRIMARY and the start of our term. + // _lastCommittedOpTime cannot be set to an earlier OpTime. + OpTime _firstOpTimeOfMyTerm; + + // The number of calls we have had to enter maintenance mode + int _maintenanceModeCalls; + + // The sub-mode of follower that we are in. Legal values are RS_SECONDARY, RS_RECOVERING, + // RS_STARTUP2 (initial sync) and RS_ROLLBACK. Only meaningful if _role == Role::follower. + // Configured via setFollowerMode(). If the sub-mode is RS_SECONDARY, then the effective + // sub-mode is either RS_SECONDARY or RS_RECOVERING, depending on _maintenanceModeCalls. + // Rather than accesing this variable direclty, one should use the getMemberState() method, + // which computes the replica set node state on the fly. + MemberState::MS _followerMode; + + // What type of PRIMARY this node currently is. Don't set this directly, call _setLeaderMode + // instead. + LeaderMode _leaderMode = LeaderMode::kNotLeader; + + typedef std::map<HostAndPort, PingStats> PingMap; + // Ping stats for each member by HostAndPort; + PingMap _pings; + + // Last vote info from the election + struct VoteLease { + static const Seconds leaseTime; + + Date_t when; + int whoId = -1; + HostAndPort whoHostAndPort; + } _voteLease; + + // V1 last vote info for elections + LastVote _lastVote{OpTime::kInitialTerm, -1}; + + enum class ReadCommittedSupport { + kUnknown, + kNo, + kYes, + }; - int _value; + // Whether or not the storage engine supports read committed. + ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown}; }; -// -// Convenience method for unittest code. Please use accessors otherwise. -// - -std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role); -std::ostream& operator<<(std::ostream& os, TopologyCoordinator::PrepareFreezeResponseResult result); - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp deleted file mode 100644 index 84f4ccbedfe..00000000000 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ /dev/null @@ -1,3239 +0,0 @@ -/** - * Copyright 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/topology_coordinator_impl.h" - -#include <limits> - -#include "mongo/db/audit.h" -#include "mongo/db/client.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/heartbeat_response_action.h" -#include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/isself.h" -#include "mongo/db/repl/repl_set_heartbeat_args.h" -#include "mongo/db/repl/repl_set_heartbeat_args_v1.h" -#include "mongo/db/repl/repl_set_heartbeat_response.h" -#include "mongo/db/repl/repl_set_html_summary.h" -#include "mongo/db/repl/repl_set_request_votes_args.h" -#include "mongo/db/repl/rslog.h" -#include "mongo/db/repl/update_position_args.h" -#include "mongo/db/server_parameters.h" -#include "mongo/rpc/metadata/oplog_query_metadata.h" -#include "mongo/rpc/metadata/repl_set_metadata.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/hex.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { -namespace repl { -using std::vector; -const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30); - -// Controls how caught up in replication a secondary with higher priority than the current primary -// must be before it will call for a priority takeover election. -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(priorityTakeoverFreshnessWindowSeconds, int, 2); - -// If this fail point is enabled, TopologyCoordinatorImpl::shouldChangeSyncSource() will ignore -// the option TopologyCoordinatorImpl::Options::maxSyncSourceLagSecs. The sync source will not be -// re-evaluated if it lags behind another node by more than 'maxSyncSourceLagSecs' seconds. -MONGO_FP_DECLARE(disableMaxSyncSourceLagSecs); - -namespace { - -template <typename T> -int indexOfIterator(const std::vector<T>& vec, typename std::vector<T>::const_iterator& it) { - return static_cast<int>(it - vec.begin()); -} - -// Maximum number of retries for a failed heartbeat. -const int kMaxHeartbeatRetries = 2; - -/** - * Returns true if the only up heartbeats are auth errors. - */ -bool _hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberData>& hbdata, const int selfIndex) { - bool foundAuthError = false; - for (std::vector<MemberData>::const_iterator it = hbdata.begin(); it != hbdata.end(); ++it) { - if (indexOfIterator(hbdata, it) == selfIndex) { - continue; - } - - if (it->up()) { - return false; - } - - if (it->hasAuthIssue()) { - foundAuthError = true; - } - } - - return foundAuthError; -} - -void appendOpTime(BSONObjBuilder* bob, - const char* elemName, - const OpTime& opTime, - const long long pv) { - if (pv == 1) { - opTime.append(bob, elemName); - } else { - bob->append(elemName, opTime.getTimestamp()); - } -} -} // namespace - -void PingStats::start(Date_t now) { - _lastHeartbeatStartDate = now; - _numFailuresSinceLastStart = 0; -} - -void PingStats::hit(Milliseconds millis) { - _numFailuresSinceLastStart = std::numeric_limits<int>::max(); - ++count; - - value = value == UninitializedPing ? millis : Milliseconds((value * 4 + millis) / 5); -} - -void PingStats::miss() { - ++_numFailuresSinceLastStart; -} - -TopologyCoordinatorImpl::TopologyCoordinatorImpl(Options options) - : _role(Role::follower), - _term(OpTime::kUninitializedTerm), - _currentPrimaryIndex(-1), - _forceSyncSourceIndex(-1), - _options(std::move(options)), - _selfIndex(-1), - _maintenanceModeCalls(0), - _followerMode(MemberState::RS_STARTUP2) { - invariant(getMemberState() == MemberState::RS_STARTUP); - // Need an entry for self in the memberHearbeatData. - _memberData.emplace_back(); - _memberData.back().setIsSelf(true); -} - -TopologyCoordinator::Role TopologyCoordinatorImpl::getRole() const { - return _role; -} - -void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) { - invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); - _forceSyncSourceIndex = index; -} - -HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const { - return _syncSource; -} - -HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, - const OpTime& lastOpTimeFetched, - ChainingPreference chainingPreference) { - // If we are not a member of the current replica set configuration, no sync source is valid. - if (_selfIndex == -1) { - LOG(1) << "Cannot sync from any members because we are not in the replica set config"; - return HostAndPort(); - } - - // if we have a target we've requested to sync from, use it - if (_forceSyncSourceIndex != -1) { - invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); - _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort(); - _forceSyncSourceIndex = -1; - log() << "choosing sync source candidate by request: " << _syncSource; - std::string msg(str::stream() << "syncing from: " << _syncSource.toString() - << " by request"); - setMyHeartbeatMessage(now, msg); - return _syncSource; - } - - // wait for 2N pings (not counting ourselves) before choosing a sync target - int needMorePings = (_memberData.size() - 1) * 2 - _getTotalPings(); - - if (needMorePings > 0) { - OCCASIONALLY log() << "waiting for " << needMorePings - << " pings from other members before syncing"; - _syncSource = HostAndPort(); - return _syncSource; - } - - // If we are only allowed to sync from the primary, set that - if (chainingPreference == ChainingPreference::kUseConfiguration && - !_rsConfig.isChainingAllowed()) { - if (_currentPrimaryIndex == -1) { - LOG(1) << "Cannot select a sync source because chaining is" - " not allowed and primary is unknown/down"; - _syncSource = HostAndPort(); - return _syncSource; - } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) { - LOG(1) << "Cannot select a sync source because chaining is not allowed and primary " - "member is blacklisted: " - << _currentPrimaryMember()->getHostAndPort(); - _syncSource = HostAndPort(); - return _syncSource; - } else if (_currentPrimaryIndex == _selfIndex) { - LOG(1) - << "Cannot select a sync source because chaining is not allowed and we are primary"; - _syncSource = HostAndPort(); - return _syncSource; - } else { - _syncSource = _currentPrimaryMember()->getHostAndPort(); - log() << "chaining not allowed, choosing primary as sync source candidate: " - << _syncSource; - std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString()); - setMyHeartbeatMessage(now, msg); - return _syncSource; - } - } - - // find the member with the lowest ping time that is ahead of me - - // choose a time that will exclude no candidates by default, in case we don't see a primary - OpTime oldestSyncOpTime; - - // Find primary's oplog time. Reject sync candidates that are more than - // _options.maxSyncSourceLagSecs seconds behind. - if (_currentPrimaryIndex != -1) { - OpTime primaryOpTime = _memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime(); - - // Check if primaryOpTime is still close to 0 because we haven't received - // our first heartbeat from a new primary yet. - unsigned int maxLag = - static_cast<unsigned int>(durationCount<Seconds>(_options.maxSyncSourceLagSecs)); - if (primaryOpTime.getSecs() >= maxLag) { - oldestSyncOpTime = - OpTime(Timestamp(primaryOpTime.getSecs() - maxLag, 0), primaryOpTime.getTerm()); - } - } - - int closestIndex = -1; - - // Make two attempts, with less restrictive rules the second time. - // - // During the first attempt, we ignore those nodes that have a larger slave - // delay, hidden nodes or non-voting, and nodes that are excessively behind. - // - // For the second attempt include those nodes, in case those are the only ones we can reach. - // - // This loop attempts to set 'closestIndex', to select a viable candidate. - for (int attempts = 0; attempts < 2; ++attempts) { - for (std::vector<MemberData>::const_iterator it = _memberData.begin(); - it != _memberData.end(); - ++it) { - const int itIndex = indexOfIterator(_memberData, it); - // Don't consider ourselves. - if (itIndex == _selfIndex) { - continue; - } - - const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex)); - - // Candidate must be up to be considered. - if (!it->up()) { - LOG(2) << "Cannot select sync source because it is not up: " - << itMemberConfig.getHostAndPort(); - continue; - } - // Candidate must be PRIMARY or SECONDARY state to be considered. - if (!it->getState().readable()) { - LOG(2) << "Cannot select sync source because it is not readable: " - << itMemberConfig.getHostAndPort(); - continue; - } - - // On the first attempt, we skip candidates that do not match these criteria. - if (attempts == 0) { - // Candidate must be a voter if we are a voter. - if (_selfConfig().isVoter() && !itMemberConfig.isVoter()) { - LOG(2) << "Cannot select sync source because we are a voter and it is not: " - << itMemberConfig.getHostAndPort(); - continue; - } - // Candidates must not be hidden. - if (itMemberConfig.isHidden()) { - LOG(2) << "Cannot select sync source because it is hidden: " - << itMemberConfig.getHostAndPort(); - continue; - } - // Candidates cannot be excessively behind. - if (it->getHeartbeatAppliedOpTime() < oldestSyncOpTime) { - LOG(2) << "Cannot select sync source because it is too far behind." - << "Latest optime of sync candidate " << itMemberConfig.getHostAndPort() - << ": " << it->getHeartbeatAppliedOpTime() - << ", oldest acceptable optime: " << oldestSyncOpTime; - continue; - } - // Candidate must not have a configured delay larger than ours. - if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay()) { - LOG(2) << "Cannot select sync source with larger slaveDelay than ours: " - << itMemberConfig.getHostAndPort(); - continue; - } - } - // Candidate must build indexes if we build indexes, to be considered. - if (_selfConfig().shouldBuildIndexes()) { - if (!itMemberConfig.shouldBuildIndexes()) { - LOG(2) << "Cannot select sync source with shouldBuildIndex differences: " - << itMemberConfig.getHostAndPort(); - continue; - } - } - // only consider candidates that are ahead of where we are - if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) { - LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. " - << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON() - << ", latest oplog optime of sync candidate " - << itMemberConfig.getHostAndPort() << ": " - << it->getHeartbeatAppliedOpTime().toBSON(); - continue; - } - // Candidate cannot be more latent than anything we've already considered. - if ((closestIndex != -1) && - (_getPing(itMemberConfig.getHostAndPort()) > - _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) { - LOG(2) << "Cannot select sync source with higher latency than the best candidate: " - << itMemberConfig.getHostAndPort(); - - continue; - } - // Candidate cannot be blacklisted. - if (_memberIsBlacklisted(itMemberConfig, now)) { - LOG(1) << "Cannot select sync source which is blacklisted: " - << itMemberConfig.getHostAndPort(); - - continue; - } - // This candidate has passed all tests; set 'closestIndex' - closestIndex = itIndex; - } - if (closestIndex != -1) - break; // no need for second attempt - } - - if (closestIndex == -1) { - // Did not find any members to sync from - std::string msg("could not find member to sync from"); - // Only log when we had a valid sync source before - if (!_syncSource.empty()) { - log() << msg << rsLog; - } - setMyHeartbeatMessage(now, msg); - - _syncSource = HostAndPort(); - return _syncSource; - } - _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort(); - log() << "sync source candidate: " << _syncSource; - std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0); - setMyHeartbeatMessage(now, msg); - return _syncSource; -} - -bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberConfig, - Date_t now) const { - std::map<HostAndPort, Date_t>::const_iterator blacklisted = - _syncSourceBlacklist.find(memberConfig.getHostAndPort()); - if (blacklisted != _syncSourceBlacklist.end()) { - if (blacklisted->second > now) { - return true; - } - } - return false; -} - -void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { - LOG(2) << "blacklisting " << host << " until " << until.toString(); - _syncSourceBlacklist[host] = until; -} - -void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Date_t now) { - std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host); - if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) { - LOG(2) << "unblacklisting " << host; - _syncSourceBlacklist.erase(hostItr); - } -} - -void TopologyCoordinatorImpl::clearSyncSourceBlacklist() { - _syncSourceBlacklist.clear(); -} - -void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target, - BSONObjBuilder* response, - Status* result) { - response->append("syncFromRequested", target.toString()); - - if (_selfIndex == -1) { - *result = Status(ErrorCodes::NotSecondary, "Removed and uninitialized nodes do not sync"); - return; - } - - const MemberConfig& selfConfig = _selfConfig(); - if (selfConfig.isArbiter()) { - *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync"); - return; - } - if (_selfIndex == _currentPrimaryIndex) { - *result = Status(ErrorCodes::NotSecondary, "primaries don't sync"); - return; - } - - ReplSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd(); - int targetIndex = 0; - for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); - ++it) { - if (it->getHostAndPort() == target) { - targetConfig = it; - break; - } - ++targetIndex; - } - if (targetConfig == _rsConfig.membersEnd()) { - *result = Status(ErrorCodes::NodeNotFound, - str::stream() << "Could not find member \"" << target.toString() - << "\" in replica set"); - return; - } - if (targetIndex == _selfIndex) { - *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself"); - return; - } - if (targetConfig->isArbiter()) { - *result = Status(ErrorCodes::InvalidOptions, - str::stream() << "Cannot sync from \"" << target.toString() - << "\" because it is an arbiter"); - return; - } - if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) { - *result = Status(ErrorCodes::InvalidOptions, - str::stream() << "Cannot sync from \"" << target.toString() - << "\" because it does not build indexes"); - return; - } - - if (selfConfig.isVoter() && !targetConfig->isVoter()) { - *result = Status(ErrorCodes::InvalidOptions, - str::stream() << "Cannot sync from \"" << target.toString() - << "\" because it is not a voter"); - return; - } - - const MemberData& hbdata = _memberData.at(targetIndex); - if (hbdata.hasAuthIssue()) { - *result = - Status(ErrorCodes::Unauthorized, - str::stream() << "not authorized to communicate with " << target.toString()); - return; - } - if (hbdata.getHealth() == 0) { - *result = - Status(ErrorCodes::HostUnreachable, - str::stream() << "I cannot reach the requested member: " << target.toString()); - return; - } - const OpTime lastOpApplied = getMyLastAppliedOpTime(); - if (hbdata.getHeartbeatAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) { - warning() << "attempting to sync from " << target << ", but its latest opTime is " - << hbdata.getHeartbeatAppliedOpTime().getSecs() << " and ours is " - << lastOpApplied.getSecs() << " so this may not work"; - response->append("warning", - str::stream() << "requested member \"" << target.toString() - << "\" is more than 10 seconds behind us"); - // not returning bad Status, just warning - } - - HostAndPort prevSyncSource = getSyncSourceAddress(); - if (!prevSyncSource.empty()) { - response->append("prevSyncTarget", prevSyncSource.toString()); - } - - setForceSyncSourceIndex(targetIndex); - *result = Status::OK(); -} - -void TopologyCoordinatorImpl::prepareFreshResponse( - const ReplicationCoordinator::ReplSetFreshArgs& args, - const Date_t now, - BSONObjBuilder* response, - Status* result) { - if (_rsConfig.getProtocolVersion() != 0) { - *result = Status(ErrorCodes::BadValue, - str::stream() << "replset: incompatible replset protocol version: " - << _rsConfig.getProtocolVersion()); - return; - } - - if (_selfIndex == -1) { - *result = Status(ErrorCodes::ReplicaSetNotFound, - "Cannot participate in elections because not initialized"); - return; - } - - if (args.setName != _rsConfig.getReplSetName()) { - *result = - Status(ErrorCodes::ReplicaSetNotFound, - str::stream() << "Wrong repl set name. Expected: " << _rsConfig.getReplSetName() - << ", received: " - << args.setName); - return; - } - - if (args.id == static_cast<unsigned>(_selfConfig().getId())) { - *result = Status(ErrorCodes::BadValue, - str::stream() << "Received replSetFresh command from member with the " - "same member ID as ourself: " - << args.id); - return; - } - - bool weAreFresher = false; - const OpTime lastOpApplied = getMyLastAppliedOpTime(); - if (_rsConfig.getConfigVersion() > args.cfgver) { - log() << "replSet member " << args.who << " is not yet aware its cfg version " - << args.cfgver << " is stale"; - response->append("info", "config version stale"); - weAreFresher = true; - } - // check not only our own optime, but any other member we can reach - else if (OpTime(args.opTime, _term) < _latestKnownOpTime()) { - weAreFresher = true; - } - response->appendDate("opTime", - Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL())); - response->append("fresher", weAreFresher); - - std::string errmsg; - bool doVeto = _shouldVetoMember(args, now, &errmsg); - response->append("veto", doVeto); - if (doVeto) { - response->append("errmsg", errmsg); - } - *result = Status::OK(); -} - -bool TopologyCoordinatorImpl::_shouldVetoMember( - const ReplicationCoordinator::ReplSetFreshArgs& args, - const Date_t& now, - std::string* errmsg) const { - if (_rsConfig.getConfigVersion() < args.cfgver) { - // We are stale; do not veto. - return false; - } - - const unsigned int memberID = args.id; - const int hopefulIndex = _getMemberIndex(memberID); - invariant(hopefulIndex != _selfIndex); - const int highestPriorityIndex = _getHighestPriorityElectableIndex(now); - - if (hopefulIndex == -1) { - *errmsg = str::stream() << "replSet couldn't find member with id " << memberID; - return true; - } - const OpTime lastOpApplied = getMyLastAppliedOpTime(); - if (_iAmPrimary() && - lastOpApplied >= _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime()) { - // hbinfo is not updated for ourself, so if we are primary we have to check the - // primary's last optime separately - *errmsg = str::stream() << "I am already primary, " - << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() - << " can try again once I've stepped down"; - return true; - } - - if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) && - (_memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime() >= - _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime())) { - // other members might be aware of more up-to-date nodes - *errmsg = - str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() - << " is trying to elect itself but " - << _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString() - << " is already primary and more up-to-date"; - return true; - } - - if ((highestPriorityIndex != -1)) { - const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex); - const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex); - - if (priorityMember.getPriority() > hopefulMember.getPriority()) { - *errmsg = str::stream() << hopefulMember.getHostAndPort().toString() - << " has lower priority of " << hopefulMember.getPriority() - << " than " << priorityMember.getHostAndPort().toString() - << " which has a priority of " << priorityMember.getPriority(); - return true; - } - } - - UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex); - reason &= ~RefusesToStand; - if (reason) { - *errmsg = str::stream() << "I don't think " - << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() - << " is electable because the " - << _getUnelectableReasonString(reason); - return true; - } - - return false; -} - -// produce a reply to a received electCmd -void TopologyCoordinatorImpl::prepareElectResponse( - const ReplicationCoordinator::ReplSetElectArgs& args, - const Date_t now, - BSONObjBuilder* response, - Status* result) { - if (_rsConfig.getProtocolVersion() != 0) { - *result = Status(ErrorCodes::BadValue, - str::stream() << "replset: incompatible replset protocol version: " - << _rsConfig.getProtocolVersion()); - return; - } - if (_selfIndex == -1) { - *result = Status(ErrorCodes::ReplicaSetNotFound, - "Cannot participate in election because not initialized"); - return; - } - - const long long myver = _rsConfig.getConfigVersion(); - const int highestPriorityIndex = _getHighestPriorityElectableIndex(now); - - const MemberConfig* primary = _currentPrimaryMember(); - const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid); - const MemberConfig* highestPriority = - highestPriorityIndex == -1 ? NULL : &_rsConfig.getMemberAt(highestPriorityIndex); - - int vote = 0; - if (args.set != _rsConfig.getReplSetName()) { - log() << "replSet error received an elect request for '" << args.set - << "' but our set name is '" << _rsConfig.getReplSetName() << "'"; - } else if (myver < args.cfgver) { - // we are stale. don't vote - log() << "replSetElect not voting because our config version is stale. Our version: " - << myver << ", their version: " << args.cfgver; - } else if (myver > args.cfgver) { - // they are stale! - log() << "replSetElect command received stale config version # during election. " - "Our version: " - << myver << ", their version: " << args.cfgver; - vote = -10000; - } else if (!hopeful) { - log() << "replSetElect couldn't find member with id " << args.whoid; - vote = -10000; - } else if (_iAmPrimary()) { - log() << "I am already primary, " << hopeful->getHostAndPort().toString() - << " can try again once I've stepped down"; - vote = -10000; - } else if (primary) { - log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but " - << primary->getHostAndPort().toString() << " is already primary"; - vote = -10000; - } else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) { - // TODO(spencer): What if the lower-priority member is more up-to-date? - log() << hopeful->getHostAndPort().toString() << " has lower priority than " - << highestPriority->getHostAndPort().toString(); - vote = -10000; - } else if (_voteLease.when + VoteLease::leaseTime >= now && _voteLease.whoId != args.whoid) { - log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for " - << _voteLease.whoHostAndPort.toString() << ' ' - << durationCount<Seconds>(now - _voteLease.when) << " secs ago"; - } else { - _voteLease.when = now; - _voteLease.whoId = args.whoid; - _voteLease.whoHostAndPort = hopeful->getHostAndPort(); - vote = _selfConfig().getNumVotes(); - invariant(hopeful->getId() == args.whoid); - if (vote > 0) { - log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " (" - << args.whoid << ')'; - } - } - - response->append("vote", vote); - response->append("round", args.round); - *result = Status::OK(); -} - -// produce a reply to a heartbeat -Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now, - const ReplSetHeartbeatArgs& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response) { - if (args.getProtocolVersion() != 1) { - return Status(ErrorCodes::BadValue, - str::stream() << "replset: incompatible replset protocol version: " - << args.getProtocolVersion()); - } - - // Verify that replica set names match - const std::string rshb = args.getSetName(); - if (ourSetName != rshb) { - log() << "replSet set names do not match, ours: " << ourSetName - << "; remote node's: " << rshb; - response->noteMismatched(); - return Status(ErrorCodes::InconsistentReplicaSetNames, - str::stream() << "Our set name of " << ourSetName << " does not match name " - << rshb - << " reported by remote node"); - } - - const MemberState myState = getMemberState(); - if (_selfIndex == -1) { - if (myState.removed()) { - return Status(ErrorCodes::InvalidReplicaSetConfig, - "Our replica set configuration is invalid or does not include us"); - } - } else { - invariant(_rsConfig.getReplSetName() == args.getSetName()); - if (args.getSenderId() == _selfConfig().getId()) { - return Status(ErrorCodes::BadValue, - str::stream() << "Received heartbeat from member with the same " - "member ID as ourself: " - << args.getSenderId()); - } - } - - // This is a replica set - response->noteReplSet(); - - response->setSetName(ourSetName); - response->setState(myState.s); - if (myState.primary()) { - response->setElectionTime(_electionTime); - } - - const OpTime lastOpApplied = getMyLastAppliedOpTime(); - const OpTime lastOpDurable = getMyLastDurableOpTime(); - - // Are we electable - response->setElectable(!_getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)); - - // Heartbeat status message - response->setHbMsg(_getHbmsg(now)); - response->setTime(duration_cast<Seconds>(now - Date_t{})); - response->setAppliedOpTime(lastOpApplied); - response->setDurableOpTime(lastOpDurable); - - if (!_syncSource.empty()) { - response->setSyncingTo(_syncSource); - } - - if (!_rsConfig.isInitialized()) { - response->setConfigVersion(-2); - return Status::OK(); - } - - const long long v = _rsConfig.getConfigVersion(); - response->setConfigVersion(v); - // Deliver new config if caller's version is older than ours - if (v > args.getConfigVersion()) { - response->setConfig(_rsConfig); - } - - // Resolve the caller's id in our Member list - int from = -1; - if (v == args.getConfigVersion() && args.getSenderId() != -1) { - from = _getMemberIndex(args.getSenderId()); - } - if (from == -1) { - // Can't find the member, so we leave out the stateDisagreement field - return Status::OK(); - } - invariant(from != _selfIndex); - - // if we thought that this node is down, let it know - if (!_memberData.at(from).up()) { - response->noteStateDisagreement(); - } - - // note that we got a heartbeat from this node - _memberData.at(from).setLastHeartbeatRecv(now); - return Status::OK(); -} - -Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now, - const ReplSetHeartbeatArgsV1& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response) { - // Verify that replica set names match - const std::string rshb = args.getSetName(); - if (ourSetName != rshb) { - log() << "replSet set names do not match, ours: " << ourSetName - << "; remote node's: " << rshb; - return Status(ErrorCodes::InconsistentReplicaSetNames, - str::stream() << "Our set name of " << ourSetName << " does not match name " - << rshb - << " reported by remote node"); - } - - const MemberState myState = getMemberState(); - if (_selfIndex == -1) { - if (myState.removed()) { - return Status(ErrorCodes::InvalidReplicaSetConfig, - "Our replica set configuration is invalid or does not include us"); - } - } else { - if (args.getSenderId() == _selfConfig().getId()) { - return Status(ErrorCodes::BadValue, - str::stream() << "Received heartbeat from member with the same " - "member ID as ourself: " - << args.getSenderId()); - } - } - - response->setSetName(ourSetName); - - response->setState(myState.s); - - if (myState.primary()) { - response->setElectionTime(_electionTime); - } - - const OpTime lastOpApplied = getMyLastAppliedOpTime(); - const OpTime lastOpDurable = getMyLastDurableOpTime(); - response->setAppliedOpTime(lastOpApplied); - response->setDurableOpTime(lastOpDurable); - - if (_currentPrimaryIndex != -1) { - response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId()); - } - - response->setTerm(_term); - - if (!_syncSource.empty()) { - response->setSyncingTo(_syncSource); - } - - if (!_rsConfig.isInitialized()) { - response->setConfigVersion(-2); - return Status::OK(); - } - - const long long v = _rsConfig.getConfigVersion(); - response->setConfigVersion(v); - // Deliver new config if caller's version is older than ours - if (v > args.getConfigVersion()) { - response->setConfig(_rsConfig); - } - - // Resolve the caller's id in our Member list - int from = -1; - if (v == args.getConfigVersion() && args.getSenderId() != -1) { - from = _getMemberIndex(args.getSenderId()); - } - if (from == -1) { - return Status::OK(); - } - invariant(from != _selfIndex); - - // note that we got a heartbeat from this node - _memberData.at(from).setLastHeartbeatRecv(now); - return Status::OK(); -} - -int TopologyCoordinatorImpl::_getMemberIndex(int id) const { - int index = 0; - for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); - ++it, ++index) { - if (it->getId() == id) { - return index; - } - } - return -1; -} - -std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequest( - Date_t now, const std::string& ourSetName, const HostAndPort& target) { - PingStats& hbStats = _pings[target]; - Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); - if (!_rsConfig.isInitialized() || - (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) || - (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) { - // This is either the first request ever for "target", or the heartbeat timeout has - // passed, so we're starting a "new" heartbeat. - hbStats.start(now); - alreadyElapsed = Milliseconds(0); - } - ReplSetHeartbeatArgs hbArgs; - hbArgs.setProtocolVersion(1); - hbArgs.setCheckEmpty(false); - if (_rsConfig.isInitialized()) { - hbArgs.setSetName(_rsConfig.getReplSetName()); - hbArgs.setConfigVersion(_rsConfig.getConfigVersion()); - if (_selfIndex >= 0) { - const MemberConfig& me = _selfConfig(); - hbArgs.setSenderHost(me.getHostAndPort()); - hbArgs.setSenderId(me.getId()); - } - } else { - hbArgs.setSetName(ourSetName); - hbArgs.setConfigVersion(-2); - } - if (serverGlobalParams.featureCompatibility.getVersion() != - ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) { - hbArgs.setHeartbeatVersion(1); - } - - const Milliseconds timeoutPeriod( - _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis() - : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod}); - const Milliseconds timeout = timeoutPeriod - alreadyElapsed; - return std::make_pair(hbArgs, timeout); -} - -std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequestV1( - Date_t now, const std::string& ourSetName, const HostAndPort& target) { - PingStats& hbStats = _pings[target]; - Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64()); - if (!_rsConfig.isInitialized() || - (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) || - (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) { - // This is either the first request ever for "target", or the heartbeat timeout has - // passed, so we're starting a "new" heartbeat. - hbStats.start(now); - alreadyElapsed = Milliseconds(0); - } - ReplSetHeartbeatArgsV1 hbArgs; - if (_rsConfig.isInitialized()) { - hbArgs.setSetName(_rsConfig.getReplSetName()); - hbArgs.setConfigVersion(_rsConfig.getConfigVersion()); - if (_selfIndex >= 0) { - const MemberConfig& me = _selfConfig(); - hbArgs.setSenderId(me.getId()); - hbArgs.setSenderHost(me.getHostAndPort()); - } - hbArgs.setTerm(_term); - } else { - hbArgs.setSetName(ourSetName); - // Config version -2 is for uninitialized config. - hbArgs.setConfigVersion(-2); - hbArgs.setTerm(OpTime::kInitialTerm); - } - if (serverGlobalParams.featureCompatibility.getVersion() != - ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) { - hbArgs.setHeartbeatVersion(1); - } - - const Milliseconds timeoutPeriod( - _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis() - : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod}); - const Milliseconds timeout(timeoutPeriod - alreadyElapsed); - return std::make_pair(hbArgs, timeout); -} - -HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse( - Date_t now, - Milliseconds networkRoundTripTime, - const HostAndPort& target, - const StatusWith<ReplSetHeartbeatResponse>& hbResponse) { - const MemberState originalState = getMemberState(); - PingStats& hbStats = _pings[target]; - invariant(hbStats.getLastHeartbeatStartDate() != Date_t()); - const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) || - (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed); - if (!hbResponse.isOK()) { - if (isUnauthorized) { - hbStats.hit(networkRoundTripTime); - } else { - hbStats.miss(); - } - } else { - hbStats.hit(networkRoundTripTime); - // Log diagnostics. - if (hbResponse.getValue().isStateDisagreement()) { - LOG(1) << target << " thinks that we are down because they cannot send us heartbeats."; - } - } - - // If a node is not PRIMARY and has no sync source, we increase the heartbeat rate in order - // to help it find a sync source more quickly, which helps ensure the PRIMARY will continue to - // see the majority of the cluster. - // - // Arbiters also decrease their heartbeat interval to at most half the election timeout period. - Milliseconds heartbeatInterval = _rsConfig.getHeartbeatInterval(); - if (_rsConfig.getProtocolVersion() == 1) { - if (getMemberState().arbiter()) { - heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2, - _rsConfig.getHeartbeatInterval()); - } else if (getSyncSourceAddress().empty() && !_iAmPrimary()) { - heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2, - _rsConfig.getHeartbeatInterval() / 4); - } - } - - const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); - Date_t nextHeartbeatStartDate; - // Determine next heartbeat start time. - if (hbStats.getNumFailuresSinceLastStart() <= kMaxHeartbeatRetries && - alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriod()) { - // There are still retries left, let's use one. - nextHeartbeatStartDate = now; - } else { - nextHeartbeatStartDate = now + heartbeatInterval; - } - - if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) { - const long long currentConfigVersion = - _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2; - const ReplSetConfig& newConfig = hbResponse.getValue().getConfig(); - if (newConfig.getConfigVersion() > currentConfigVersion) { - HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction(); - nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); - return nextAction; - } else { - // Could be we got the newer version before we got the response, or the - // target erroneously sent us one, even through it isn't newer. - if (newConfig.getConfigVersion() < currentConfigVersion) { - LOG(1) << "Config version from heartbeat was older than ours."; - } else { - LOG(2) << "Config from heartbeat response was same as ours."; - } - if (logger::globalLogDomain()->shouldLog(MongoLogDefaultComponent_component, - ::mongo::LogstreamBuilder::severityCast(2))) { - LogstreamBuilder lsb = log(); - if (_rsConfig.isInitialized()) { - lsb << "Current config: " << _rsConfig.toBSON() << "; "; - } - lsb << "Config in heartbeat: " << newConfig.toBSON(); - } - } - } - - // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do, - // so return early. - if (!_rsConfig.isInitialized()) { - HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); - nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); - return nextAction; - } - // If we're not in the config, we don't need to respond to heartbeats. - if (_selfIndex == -1) { - LOG(1) << "Could not find ourself in current config so ignoring heartbeat from " << target - << " -- current config: " << _rsConfig.toBSON(); - HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); - nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); - return nextAction; - } - const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target); - if (memberIndex == -1) { - LOG(1) << "Could not find " << target << " in current config so ignoring --" - " current config: " - << _rsConfig.toBSON(); - HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); - nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); - return nextAction; - } - - invariant(memberIndex != _selfIndex); - - MemberData& hbData = _memberData.at(memberIndex); - const MemberConfig member = _rsConfig.getMemberAt(memberIndex); - bool advancedOpTime = false; - if (!hbResponse.isOK()) { - if (isUnauthorized) { - hbData.setAuthIssue(now); - } else if (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries || - alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriod()) { - hbData.setDownValues(now, hbResponse.getStatus().reason()); - } else { - LOG(3) << "Bad heartbeat response from " << target << "; trying again; Retries left: " - << (kMaxHeartbeatRetries - hbStats.getNumFailuresSinceLastStart()) << "; " - << alreadyElapsed << " have already elapsed"; - } - } else { - ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue()); - LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId() - << ", msg: " << hbr.getHbMsg(); - advancedOpTime = hbData.setUpValues(now, std::move(hbr)); - } - - HeartbeatResponseAction nextAction; - if (_rsConfig.getProtocolVersion() == 0) { - nextAction = _updatePrimaryFromHBData(memberIndex, originalState, now); - } else { - nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now); - } - - nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); - nextAction.setAdvancedOpTime(advancedOpTime); - return nextAction; -} - -bool TopologyCoordinatorImpl::haveNumNodesReachedOpTime(const OpTime& targetOpTime, - int numNodes, - bool durablyWritten) { - // Replication progress that is for some reason ahead of us should not allow us to - // satisfy a write concern if we aren't caught up ourselves. - OpTime myOpTime = durablyWritten ? getMyLastDurableOpTime() : getMyLastAppliedOpTime(); - if (myOpTime < targetOpTime) { - return false; - } - - for (auto&& memberData : _memberData) { - const OpTime& memberOpTime = - durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime(); - if (memberOpTime >= targetOpTime) { - --numNodes; - } - - if (numNodes <= 0) { - return true; - } - } - return false; -} - -bool TopologyCoordinatorImpl::haveTaggedNodesReachedOpTime(const OpTime& opTime, - const ReplSetTagPattern& tagPattern, - bool durablyWritten) { - ReplSetTagMatch matcher(tagPattern); - for (auto&& memberData : _memberData) { - const OpTime& memberOpTime = - durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime(); - if (memberOpTime >= opTime) { - // This node has reached the desired optime, now we need to check if it is a part - // of the tagPattern. - int memberIndex = memberData.getConfigIndex(); - invariant(memberIndex >= 0); - const MemberConfig& memberConfig = _rsConfig.getMemberAt(memberIndex); - for (MemberConfig::TagIterator it = memberConfig.tagsBegin(); - it != memberConfig.tagsEnd(); - ++it) { - if (matcher.update(*it)) { - return true; - } - } - } - } - return false; -} - -HeartbeatResponseAction TopologyCoordinatorImpl::checkMemberTimeouts(Date_t now) { - bool stepdown = false; - for (int memberIndex = 0; memberIndex < static_cast<int>(_memberData.size()); memberIndex++) { - auto& memberData = _memberData[memberIndex]; - if (!memberData.isSelf() && !memberData.lastUpdateStale() && - now - memberData.getLastUpdate() >= _rsConfig.getElectionTimeoutPeriod()) { - memberData.markLastUpdateStale(); - if (_iAmPrimary()) { - stepdown = stepdown || setMemberAsDown(now, memberIndex); - } - } - } - if (stepdown) { - log() << "can't see a majority of the set, relinquishing primary"; - return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); - } - return HeartbeatResponseAction::makeNoAction(); -} - -std::vector<HostAndPort> TopologyCoordinatorImpl::getHostsWrittenTo(const OpTime& op, - bool durablyWritten, - bool skipSelf) { - std::vector<HostAndPort> hosts; - for (const auto& memberData : _memberData) { - if (skipSelf && memberData.isSelf()) { - continue; - } - - if (durablyWritten) { - if (memberData.getLastDurableOpTime() < op) { - continue; - } - } else if (memberData.getLastAppliedOpTime() < op) { - continue; - } - - hosts.push_back(memberData.getHostAndPort()); - } - return hosts; -} - -bool TopologyCoordinatorImpl::setMemberAsDown(Date_t now, const int memberIndex) { - invariant(memberIndex != _selfIndex); - invariant(memberIndex != -1); - invariant(_currentPrimaryIndex == _selfIndex); - MemberData& hbData = _memberData.at(memberIndex); - hbData.setDownValues(now, "no response within election timeout period"); - - if (CannotSeeMajority & _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) { - return true; - } - - return false; -} - -std::pair<int, Date_t> TopologyCoordinatorImpl::getStalestLiveMember() const { - Date_t earliestDate = Date_t::max(); - int earliestMemberId = -1; - for (const auto& memberData : _memberData) { - if (memberData.isSelf()) { - continue; - } - if (memberData.lastUpdateStale()) { - // Already stale. - continue; - } - LOG(3) << "memberData lastupdate is: " << memberData.getLastUpdate(); - if (earliestDate > memberData.getLastUpdate()) { - earliestDate = memberData.getLastUpdate(); - earliestMemberId = memberData.getMemberId(); - } - } - LOG(3) << "stalest member " << earliestMemberId << " date: " << earliestDate; - return std::make_pair(earliestMemberId, earliestDate); -} - -void TopologyCoordinatorImpl::resetAllMemberTimeouts(Date_t now) { - for (auto&& memberData : _memberData) - memberData.updateLiveness(now); -} - -void TopologyCoordinatorImpl::resetMemberTimeouts( - Date_t now, const stdx::unordered_set<HostAndPort>& member_set) { - for (auto&& memberData : _memberData) { - if (member_set.count(memberData.getHostAndPort())) - memberData.updateLiveness(now); - } -} - -OpTime TopologyCoordinatorImpl::getMyLastAppliedOpTime() const { - return _selfMemberData().getLastAppliedOpTime(); -} - -OpTime TopologyCoordinatorImpl::getMyLastDurableOpTime() const { - return _selfMemberData().getLastDurableOpTime(); -} - -MemberData* TopologyCoordinatorImpl::getMyMemberData() { - return &_memberData[_selfMemberDataIndex()]; -} - -MemberData* TopologyCoordinatorImpl::findMemberDataByMemberId(const int memberId) { - const int memberIndex = _getMemberIndex(memberId); - if (memberIndex >= 0) - return &_memberData[memberIndex]; - return nullptr; -} - -MemberData* TopologyCoordinatorImpl::findMemberDataByRid(const OID rid) { - for (auto& memberData : _memberData) { - if (memberData.getRid() == rid) - return &memberData; - } - return nullptr; -} - -MemberData* TopologyCoordinatorImpl::addSlaveMemberData(const OID rid) { - invariant(!_memberData.empty()); // Must always have our own entry first. - invariant(!_rsConfig.isInitialized()); // Used only for master-slave. - _memberData.emplace_back(); - auto* result = &_memberData.back(); - result->setRid(rid); - return result; -} - -HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1( - int updatedConfigIndex, const MemberState& originalState, Date_t now) { - // - // Updates the local notion of which remote node, if any is primary. - // Start the priority takeover process if we are eligible. - // - - invariant(updatedConfigIndex != _selfIndex); - - // If we are missing from the config, do not participate in primary maintenance or election. - if (_selfIndex == -1) { - return HeartbeatResponseAction::makeNoAction(); - } - // If we are the primary, there must be no other primary, otherwise its higher term would - // have already made us step down. - if (_currentPrimaryIndex == _selfIndex) { - return HeartbeatResponseAction::makeNoAction(); - } - - // Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex. - int primaryIndex = -1; - for (size_t i = 0; i < _memberData.size(); i++) { - const MemberData& member = _memberData.at(i); - if (member.getState().primary() && member.up()) { - if (primaryIndex == -1 || _memberData.at(primaryIndex).getTerm() < member.getTerm()) { - primaryIndex = i; - } - } - } - _currentPrimaryIndex = primaryIndex; - if (_currentPrimaryIndex == -1) { - return HeartbeatResponseAction::makeNoAction(); - } - - // Clear last heartbeat message on ourselves. - setMyHeartbeatMessage(now, ""); - - // Takeover when the replset is stable. - // - // Take over the primary only if the remote primary is in the latest term I know. - // This is done only when we get a heartbeat response from the primary. - // Otherwise, there must be an outstanding election, which may succeed or not, but - // the remote primary will become aware of that election eventually and step down. - if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) { - - // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled. - bool catchupTakeoverDisabled = - ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() || - ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay(); - if (!catchupTakeoverDisabled && (_memberData.at(primaryIndex).getLastAppliedOpTime() < - _memberData.at(_selfIndex).getLastAppliedOpTime())) { - LOG(2) << "I can take over the primary due to fresher data." - << " Current primary index: " << primaryIndex << " in term " - << _memberData.at(primaryIndex).getTerm() << "." - << " Current primary optime: " - << _memberData.at(primaryIndex).getLastAppliedOpTime() - << " My optime: " << _memberData.at(_selfIndex).getLastAppliedOpTime(); - - return HeartbeatResponseAction::makeCatchupTakeoverAction(); - } - - if (_rsConfig.getMemberAt(primaryIndex).getPriority() < - _rsConfig.getMemberAt(_selfIndex).getPriority()) { - LOG(4) << "I can take over the primary due to higher priority." - << " Current primary index: " << primaryIndex << " in term " - << _memberData.at(primaryIndex).getTerm(); - - return HeartbeatResponseAction::makePriorityTakeoverAction(); - } - } - return HeartbeatResponseAction::makeNoAction(); -} - -HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData( - int updatedConfigIndex, const MemberState& originalState, Date_t now) { - // This method has two interrelated responsibilities, performed in two phases. - // - // First, it updates the local notion of which remote node, if any is primary. In the - // process, it may request a remote primary to step down because there is a higher priority - // node waiting, or because the local node thinks it is primary and that it has a more - // recent electionTime. It may instead decide that the local node should step down itself, - // because a remote has a more recent election time. - // - // Second, if there is no remote primary, and the local node is not primary, it considers - // whether or not to stand for election. - invariant(updatedConfigIndex != _selfIndex); - - // We are missing from the config, so do not participate in primary maintenance or election. - if (_selfIndex == -1) { - return HeartbeatResponseAction::makeNoAction(); - } - - //////////////////// - // Phase 1 - //////////////////// - - // If we believe the node whose data was just updated is primary, confirm that - // the updated data supports that notion. If not, erase our notion of who is primary. - if (updatedConfigIndex == _currentPrimaryIndex) { - const MemberData& updatedHBData = _memberData.at(updatedConfigIndex); - if (!updatedHBData.up() || !updatedHBData.getState().primary()) { - _currentPrimaryIndex = -1; - } - } - - // If the current primary is not highest priority and up to date (within 10s), - // have them/me stepdown. - if (_currentPrimaryIndex != -1) { - // check if we should ask the primary (possibly ourselves) to step down - const int highestPriorityIndex = _getHighestPriorityElectableIndex(now); - if (highestPriorityIndex != -1) { - const MemberConfig& currentPrimaryMember = _rsConfig.getMemberAt(_currentPrimaryIndex); - const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex); - const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex - ? getMyLastAppliedOpTime() - : _memberData.at(highestPriorityIndex).getHeartbeatAppliedOpTime(); - - if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) && - _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime)) { - const OpTime latestOpTime = _latestKnownOpTime(); - - if (_iAmPrimary()) { - if (_leaderMode == LeaderMode::kSteppingDown) { - return HeartbeatResponseAction::makeNoAction(); - } - log() << "Stepping down self (priority " << currentPrimaryMember.getPriority() - << ") because " << highestPriorityMember.getHostAndPort() - << " has higher priority " << highestPriorityMember.getPriority() - << " and is only " - << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs()) - << " seconds behind me"; - const Date_t until = - now + VoteLease::leaseTime + _rsConfig.getHeartbeatInterval(); - if (_electionSleepUntil < until) { - _electionSleepUntil = until; - } - return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); - } else if ((highestPriorityIndex == _selfIndex) && (_electionSleepUntil <= now)) { - // If this node is the highest priority node, and it is not in - // an inter-election sleep period, ask the current primary to step down. - // This is an optimization, because the remote primary will almost certainly - // notice this node's electability promptly, via its own heartbeat process. - log() << "Requesting that " << currentPrimaryMember.getHostAndPort() - << " (priority " << currentPrimaryMember.getPriority() - << ") step down because I have higher priority " - << highestPriorityMember.getPriority() << " and am only " - << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs()) - << " seconds behind it"; - int primaryIndex = _currentPrimaryIndex; - _currentPrimaryIndex = -1; - return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex); - } - } - } - } - - // Scan the member list's heartbeat data for who is primary, and update - // _currentPrimaryIndex and _role, or request a remote to step down, as necessary. - { - int remotePrimaryIndex = -1; - for (std::vector<MemberData>::const_iterator it = _memberData.begin(); - it != _memberData.end(); - ++it) { - const int itIndex = indexOfIterator(_memberData, it); - if (itIndex == _selfIndex) { - continue; - } - - if (it->getState().primary() && it->up()) { - if (remotePrimaryIndex != -1) { - // two other nodes think they are primary (asynchronously polled) - // -- wait for things to settle down. - warning() << "two remote primaries (transiently)"; - return HeartbeatResponseAction::makeNoAction(); - } - remotePrimaryIndex = itIndex; - } - } - - if (remotePrimaryIndex != -1) { - // If it's the same as last time, don't do anything further. - if (_currentPrimaryIndex == remotePrimaryIndex) { - return HeartbeatResponseAction::makeNoAction(); - } - // Clear last heartbeat message on ourselves (why?) - setMyHeartbeatMessage(now, ""); - - // If we are also primary, this is a problem. Determine who should step down. - if (_iAmPrimary()) { - Timestamp remoteElectionTime = _memberData.at(remotePrimaryIndex).getElectionTime(); - log() << "another primary seen with election time " << remoteElectionTime - << " my election time is " << _electionTime; - - // Step down whomever has the older election time. - if (remoteElectionTime > _electionTime) { - if (_leaderMode == LeaderMode::kSteppingDown) { - return HeartbeatResponseAction::makeNoAction(); - } - log() << "stepping down; another primary was elected more recently"; - return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); - } else { - log() << "another PRIMARY detected and it should step down" - " since it was elected earlier than me"; - return HeartbeatResponseAction::makeStepDownRemoteAction(remotePrimaryIndex); - } - } - - _currentPrimaryIndex = remotePrimaryIndex; - return HeartbeatResponseAction::makeNoAction(); - } - } - - //////////////////// - // Phase 2 - //////////////////// - - // We do not believe any remote to be primary. - - // If we are primary, check if we can still see majority of the set; - // stepdown if we can't. - if (_iAmPrimary()) { - if (CannotSeeMajority & - _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) { - if (_leaderMode == LeaderMode::kSteppingDown) { - return HeartbeatResponseAction::makeNoAction(); - } - log() << "can't see a majority of the set, relinquishing primary"; - return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex); - } - - LOG(2) << "Choosing to remain primary"; - return HeartbeatResponseAction::makeNoAction(); - } - - fassert(18505, _currentPrimaryIndex == -1); - - const MemberState currentState = getMemberState(); - if (originalState.recovering() && currentState.secondary()) { - // We just transitioned from RECOVERING to SECONDARY, this can only happen if we - // received a heartbeat with an auth error when previously all the heartbeats we'd - // received had auth errors. In this case, don't return makeElectAction() because - // that could cause the election to start before the ReplicationCoordinator has updated - // its notion of the member state to SECONDARY. Instead return noAction so that the - // ReplicationCooridinator knows to update its tracking of the member state off of the - // TopologyCoordinator, and leave starting the election until the next heartbeat comes - // back. - return HeartbeatResponseAction::makeNoAction(); - } - - // At this point, there is no primary anywhere. Check to see if we should become a candidate. - const auto status = checkShouldStandForElection(now); - if (!status.isOK()) { - // NOTE: This log line is checked in unit test(s). - LOG(2) << "TopologyCoordinatorImpl::_updatePrimaryFromHBData - " << status.reason(); - return HeartbeatResponseAction::makeNoAction(); - } - fassertStatusOK(28816, becomeCandidateIfElectable(now, StartElectionReason::kElectionTimeout)); - return HeartbeatResponseAction::makeElectAction(); -} - -Status TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now) const { - if (_currentPrimaryIndex != -1) { - return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"}; - } - invariant(_role != Role::leader); - - if (_role == Role::candidate) { - return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"}; - } - - const UnelectableReasonMask unelectableReason = - _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout); - if (NotCloseEnoughToLatestOptime & unelectableReason) { - return {ErrorCodes::NodeNotElectable, - str::stream() << "Not standing for election because " - << _getUnelectableReasonString(unelectableReason) - << "; my last optime is " - << getMyLastAppliedOpTime().toString() - << " and the newest is " - << _latestKnownOpTime().toString()}; - } - if (unelectableReason) { - return {ErrorCodes::NodeNotElectable, - str::stream() << "Not standing for election because " - << _getUnelectableReasonString(unelectableReason)}; - } - if (_electionSleepUntil > now) { - if (_rsConfig.getProtocolVersion() == 1) { - return { - ErrorCodes::NodeNotElectable, - str::stream() << "Not standing for election before " - << dateToISOStringLocal(_electionSleepUntil) - << " because I stood up or learned about a new term too recently"}; - } else { - return {ErrorCodes::NodeNotElectable, - str::stream() << "Not standing for election before " - << dateToISOStringLocal(_electionSleepUntil) - << " because I stood too recently"}; - } - } - // All checks passed. Start election proceedings. - return Status::OK(); -} - -bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const { - int vUp = 0; - for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); - ++it) { - const int itIndex = indexOfIterator(_memberData, it); - if (itIndex == _selfIndex || it->up()) { - vUp += _rsConfig.getMemberAt(itIndex).getNumVotes(); - } - } - - return vUp * 2 > _rsConfig.getTotalVotingMembers(); -} - -int TopologyCoordinatorImpl::_findHealthyPrimaryOfEqualOrGreaterPriority( - const int candidateIndex) const { - const double candidatePriority = _rsConfig.getMemberAt(candidateIndex).getPriority(); - for (auto it = _memberData.begin(); it != _memberData.end(); ++it) { - if (!it->up() || it->getState() != MemberState::RS_PRIMARY) { - continue; - } - const int itIndex = indexOfIterator(_memberData, it); - const double priority = _rsConfig.getMemberAt(itIndex).getPriority(); - if (itIndex != candidateIndex && priority >= candidatePriority) { - return itIndex; - } - } - - return -1; -} - -bool TopologyCoordinatorImpl::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const { - const OpTime latestKnownOpTime = _latestKnownOpTime(); - // Use addition instead of subtraction to avoid overflow. - return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs()); -} - -bool TopologyCoordinatorImpl::_amIFreshEnoughForPriorityTakeover() const { - const OpTime latestKnownOpTime = _latestKnownOpTime(); - - // Rules are: - // - If the terms don't match, we don't call for priority takeover. - // - If our optime and the latest optime happen in different seconds, our optime must be within - // at least priorityTakeoverFreshnessWindowSeconds seconds of the latest optime. - // - If our optime and the latest optime happen in the same second, our optime must be within - // at least 1000 oplog entries of the latest optime (i.e. the increment portion of the timestamp - // must be within 1000). This is to handle the case where a primary had its clock set far into - // the future, took some writes, then had its clock set back. In that case the timestamp - // component of all future oplog entries generated will be the same, until real world time - // passes the timestamp component of the last oplog entry. - - const OpTime ourLastOpApplied = getMyLastAppliedOpTime(); - if (ourLastOpApplied.getTerm() != latestKnownOpTime.getTerm()) { - return false; - } - - if (ourLastOpApplied.getTimestamp().getSecs() != latestKnownOpTime.getTimestamp().getSecs()) { - return ourLastOpApplied.getTimestamp().getSecs() + priorityTakeoverFreshnessWindowSeconds >= - latestKnownOpTime.getTimestamp().getSecs(); - } else { - return ourLastOpApplied.getTimestamp().getInc() + 1000 >= - latestKnownOpTime.getTimestamp().getInc(); - } -} - -bool TopologyCoordinatorImpl::_amIFreshEnoughForCatchupTakeover() const { - - const OpTime latestKnownOpTime = _latestKnownOpTime(); - - // Rules are: - // - We must have the freshest optime of all the up nodes. - // - We must specifically have a fresher optime than the primary (can't be equal). - // - The term of our last applied op must be less than the current term. This ensures that no - // writes have happened since the most recent election and that the primary is still in - // catchup mode. - - // There is no point to a catchup takeover if we aren't the freshest node because - // another node would immediately perform another catchup takeover when we become primary. - const OpTime ourLastOpApplied = getMyLastAppliedOpTime(); - if (ourLastOpApplied < latestKnownOpTime) { - return false; - } - - if (_currentPrimaryIndex == -1) { - return false; - } - - // If we aren't ahead of the primary, there is no point to having a catchup takeover. - const OpTime primaryLastOpApplied = _memberData[_currentPrimaryIndex].getLastAppliedOpTime(); - - if (ourLastOpApplied <= primaryLastOpApplied) { - return false; - } - - // If the term of our last applied op is less than the current term, the primary didn't write - // anything and it is still in catchup mode. - return ourLastOpApplied.getTerm() < _term; -} - -bool TopologyCoordinatorImpl::_iAmPrimary() const { - if (_role == Role::leader) { - invariant(_currentPrimaryIndex == _selfIndex); - invariant(_leaderMode != LeaderMode::kNotLeader); - return true; - } - return false; -} - -OpTime TopologyCoordinatorImpl::_latestKnownOpTime() const { - OpTime latest = getMyLastAppliedOpTime(); - for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); - ++it) { - // Ignore self - // TODO(russotto): Simplify when heartbeat and spanning tree times are combined. - if (it->isSelf()) { - continue; - } - // Ignore down members - if (!it->up()) { - continue; - } - // Ignore removed nodes (not in config, so not valid). - if (it->getState().removed()) { - continue; - } - - OpTime optime = it->getHeartbeatAppliedOpTime(); - - if (optime > latest) { - latest = optime; - } - } - - return latest; -} - -bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex, - int memberTwoIndex) const { - if (memberOneIndex == -1) - return false; - - if (memberTwoIndex == -1) - return true; - - return _rsConfig.getMemberAt(memberOneIndex).getPriority() > - _rsConfig.getMemberAt(memberTwoIndex).getPriority(); -} - -int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now) const { - int maxIndex = -1; - for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) { - UnelectableReasonMask reason = currentIndex == _selfIndex - ? _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout) - : _getUnelectableReason(currentIndex); - if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) { - maxIndex = currentIndex; - } - } - - return maxIndex; -} - -bool TopologyCoordinatorImpl::prepareForUnconditionalStepDown() { - if (_leaderMode == LeaderMode::kSteppingDown) { - // Can only be processing one required stepdown at a time. - return false; - } - // Heartbeat-initiated stepdowns take precedence over stepdown command initiated stepdowns, so - // it's safe to transition from kAttemptingStepDown to kSteppingDown. - _setLeaderMode(LeaderMode::kSteppingDown); - return true; -} - -Status TopologyCoordinatorImpl::prepareForStepDownAttempt() { - if (_leaderMode == LeaderMode::kSteppingDown || - _leaderMode == LeaderMode::kAttemptingStepDown) { - return Status{ErrorCodes::ConflictingOperationInProgress, - "This node is already in the process of stepping down"}; - } - _setLeaderMode(LeaderMode::kAttemptingStepDown); - return Status::OK(); -} - -void TopologyCoordinatorImpl::abortAttemptedStepDownIfNeeded() { - if (_leaderMode == TopologyCoordinator::LeaderMode::kAttemptingStepDown) { - _setLeaderMode(TopologyCoordinator::LeaderMode::kMaster); - } -} - -void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMemberState, - const Timestamp& electionTime) { - invariant(_selfIndex != -1); - if (newMemberState == getMemberState()) - return; - switch (newMemberState.s) { - case MemberState::RS_PRIMARY: - _role = Role::candidate; - processWinElection(OID(), electionTime); - invariant(_role == Role::leader); - break; - case MemberState::RS_SECONDARY: - case MemberState::RS_ROLLBACK: - case MemberState::RS_RECOVERING: - case MemberState::RS_STARTUP2: - _role = Role::follower; - _followerMode = newMemberState.s; - if (_currentPrimaryIndex == _selfIndex) { - _currentPrimaryIndex = -1; - _setLeaderMode(LeaderMode::kNotLeader); - } - break; - case MemberState::RS_STARTUP: - updateConfig(ReplSetConfig(), -1, Date_t()); - break; - default: - severe() << "Cannot switch to state " << newMemberState; - invariant(false); - } - if (getMemberState() != newMemberState.s) { - severe() << "Expected to enter state " << newMemberState << " but am now in " - << getMemberState(); - invariant(false); - } - log() << newMemberState; -} - -void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) { - if (primaryIndex == _selfIndex) { - changeMemberState_forTest(MemberState::RS_PRIMARY); - } else { - if (_iAmPrimary()) { - changeMemberState_forTest(MemberState::RS_SECONDARY); - } - if (primaryIndex != -1) { - ReplSetHeartbeatResponse hbResponse; - hbResponse.setState(MemberState::RS_PRIMARY); - hbResponse.setElectionTime(Timestamp()); - hbResponse.setAppliedOpTime(_memberData.at(primaryIndex).getHeartbeatAppliedOpTime()); - hbResponse.setSyncingTo(HostAndPort()); - hbResponse.setHbMsg(""); - _memberData.at(primaryIndex) - .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(), - std::move(hbResponse)); - } - _currentPrimaryIndex = primaryIndex; - } -} - -const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const { - if (_currentPrimaryIndex == -1) - return NULL; - - return &(_rsConfig.getMemberAt(_currentPrimaryIndex)); -} - -void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, - BSONObjBuilder* response, - Status* result) { - // output for each member - vector<BSONObj> membersOut; - const MemberState myState = getMemberState(); - const Date_t now = rsStatusArgs.now; - const OpTime lastOpApplied = getMyLastAppliedOpTime(); - const OpTime lastOpDurable = getMyLastDurableOpTime(); - const BSONObj& initialSyncStatus = rsStatusArgs.initialSyncStatus; - - if (_selfIndex == -1) { - // We're REMOVED or have an invalid config - response->append("state", static_cast<int>(myState.s)); - response->append("stateStr", myState.toString()); - response->append("uptime", rsStatusArgs.selfUptime); - - appendOpTime(response, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); - - response->appendDate("optimeDate", - Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); - if (_maintenanceModeCalls) { - response->append("maintenanceMode", _maintenanceModeCalls); - } - std::string s = _getHbmsg(now); - if (!s.empty()) - response->append("infoMessage", s); - *result = Status(ErrorCodes::InvalidReplicaSetConfig, - "Our replica set config is invalid or we are not a member of it"); - return; - } - - for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); - ++it) { - const int itIndex = indexOfIterator(_memberData, it); - if (itIndex == _selfIndex) { - // add self - BSONObjBuilder bb; - bb.append("_id", _selfConfig().getId()); - bb.append("name", _selfConfig().getHostAndPort().toString()); - bb.append("health", 1.0); - bb.append("state", static_cast<int>(myState.s)); - bb.append("stateStr", myState.toString()); - bb.append("uptime", rsStatusArgs.selfUptime); - if (!_selfConfig().isArbiter()) { - appendOpTime(&bb, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); - bb.appendDate("optimeDate", - Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); - } - - if (!_syncSource.empty() && !_iAmPrimary()) { - bb.append("syncingTo", _syncSource.toString()); - } - - if (_maintenanceModeCalls) { - bb.append("maintenanceMode", _maintenanceModeCalls); - } - - std::string s = _getHbmsg(now); - if (!s.empty()) - bb.append("infoMessage", s); - - if (myState.primary()) { - bb.append("electionTime", _electionTime); - bb.appendDate("electionDate", - Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs()))); - } - bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion()); - bb.append("self", true); - membersOut.push_back(bb.obj()); - } else { - // add non-self member - const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex); - BSONObjBuilder bb; - bb.append("_id", itConfig.getId()); - bb.append("name", itConfig.getHostAndPort().toString()); - double h = it->getHealth(); - bb.append("health", h); - const MemberState state = it->getState(); - bb.append("state", static_cast<int>(state.s)); - if (h == 0) { - // if we can't connect the state info is from the past - // and could be confusing to show - bb.append("stateStr", "(not reachable/healthy)"); - } else { - bb.append("stateStr", it->getState().toString()); - } - - const unsigned int uptime = static_cast<unsigned int>(( - it->getUpSince() != Date_t() ? durationCount<Seconds>(now - it->getUpSince()) : 0)); - bb.append("uptime", uptime); - if (!itConfig.isArbiter()) { - appendOpTime( - &bb, "optime", it->getHeartbeatAppliedOpTime(), _rsConfig.getProtocolVersion()); - appendOpTime(&bb, - "optimeDurable", - it->getHeartbeatDurableOpTime(), - _rsConfig.getProtocolVersion()); - - bb.appendDate("optimeDate", - Date_t::fromDurationSinceEpoch( - Seconds(it->getHeartbeatAppliedOpTime().getSecs()))); - bb.appendDate("optimeDurableDate", - Date_t::fromDurationSinceEpoch( - Seconds(it->getHeartbeatDurableOpTime().getSecs()))); - } - bb.appendDate("lastHeartbeat", it->getLastHeartbeat()); - bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv()); - Milliseconds ping = _getPing(itConfig.getHostAndPort()); - if (ping != UninitializedPing) { - bb.append("pingMs", durationCount<Milliseconds>(ping)); - std::string s = it->getLastHeartbeatMsg(); - if (!s.empty()) - bb.append("lastHeartbeatMessage", s); - } - if (it->hasAuthIssue()) { - bb.append("authenticated", false); - } - const HostAndPort& syncSource = it->getSyncSource(); - if (!syncSource.empty() && !state.primary()) { - bb.append("syncingTo", syncSource.toString()); - } - - if (state == MemberState::RS_PRIMARY) { - bb.append("electionTime", it->getElectionTime()); - bb.appendDate( - "electionDate", - Date_t::fromDurationSinceEpoch(Seconds(it->getElectionTime().getSecs()))); - } - bb.appendIntOrLL("configVersion", it->getConfigVersion()); - membersOut.push_back(bb.obj()); - } - } - - // sort members bson - sort(membersOut.begin(), membersOut.end(), SimpleBSONObjComparator::kInstance.makeLessThan()); - - response->append("set", _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : ""); - response->append("date", now); - response->append("myState", myState.s); - response->append("term", _term); - - // Add sync source info - if (!_syncSource.empty() && !myState.primary() && !myState.removed()) { - response->append("syncingTo", _syncSource.toString()); - } - - if (_rsConfig.isConfigServer()) { - response->append("configsvr", true); - } - - response->append("heartbeatIntervalMillis", - durationCount<Milliseconds>(_rsConfig.getHeartbeatInterval())); - - // New optimes, to hold them all. - BSONObjBuilder optimes; - _lastCommittedOpTime.append(&optimes, "lastCommittedOpTime"); - if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) { - rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime"); - } - - appendOpTime(&optimes, "appliedOpTime", lastOpApplied, _rsConfig.getProtocolVersion()); - appendOpTime(&optimes, "durableOpTime", lastOpDurable, _rsConfig.getProtocolVersion()); - response->append("optimes", optimes.obj()); - - if (!initialSyncStatus.isEmpty()) { - response->append("initialSyncStatus", initialSyncStatus); - } - - response->append("members", membersOut); - *result = Status::OK(); -} - -StatusWith<BSONObj> TopologyCoordinatorImpl::prepareReplSetUpdatePositionCommand( - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle, - OpTime currentCommittedSnapshotOpTime) const { - BSONObjBuilder cmdBuilder; - invariant(_rsConfig.isInitialized()); - // Do not send updates if we have been removed from the config. - if (_selfIndex == -1) { - return Status(ErrorCodes::NodeNotFound, - "This node is not in the current replset configuration."); - } - cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1); - // Create an array containing objects each live member connected to us and for ourself. - BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes")); - for (const auto& memberData : _memberData) { - if (memberData.getLastAppliedOpTime().isNull()) { - // Don't include info on members we haven't heard from yet. - continue; - } - // Don't include members we think are down. - if (!memberData.isSelf() && memberData.lastUpdateStale()) { - continue; - } - - BSONObjBuilder entry(arrayBuilder.subobjStart()); - switch (commandStyle) { - case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle: - memberData.getLastDurableOpTime().append( - &entry, UpdatePositionArgs::kDurableOpTimeFieldName); - memberData.getLastAppliedOpTime().append( - &entry, UpdatePositionArgs::kAppliedOpTimeFieldName); - break; - case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle: - entry.append("_id", memberData.getRid()); - if (_rsConfig.getProtocolVersion() == 1) { - memberData.getLastDurableOpTime().append(&entry, "optime"); - } else { - entry.append("optime", memberData.getLastDurableOpTime().getTimestamp()); - } - break; - } - entry.append(UpdatePositionArgs::kMemberIdFieldName, memberData.getMemberId()); - entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion()); - } - arrayBuilder.done(); - - // Add metadata to command. Old style parsing logic will reject the metadata. - if (commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) { - prepareReplSetMetadata(currentCommittedSnapshotOpTime) - .writeToMetadata(&cmdBuilder) - .transitional_ignore(); - } - return cmdBuilder.obj(); -} - -void TopologyCoordinatorImpl::fillMemberData(BSONObjBuilder* result) { - BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress")); - { - for (const auto& memberData : _memberData) { - BSONObjBuilder entry(replicationProgress.subobjStart()); - entry.append("rid", memberData.getRid()); - const auto lastDurableOpTime = memberData.getLastDurableOpTime(); - if (_rsConfig.getProtocolVersion() == 1) { - BSONObjBuilder opTime(entry.subobjStart("optime")); - opTime.append("ts", lastDurableOpTime.getTimestamp()); - opTime.append("term", lastDurableOpTime.getTerm()); - opTime.done(); - } else { - entry.append("optime", lastDurableOpTime.getTimestamp()); - } - entry.append("host", memberData.getHostAndPort().toString()); - if (_selfIndex >= 0) { - const int memberId = memberData.getMemberId(); - invariant(memberId >= 0); - entry.append("memberId", memberId); - } - } - } -} - -void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) { - const MemberState myState = getMemberState(); - if (!_rsConfig.isInitialized()) { - response->markAsNoConfig(); - return; - } - - for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); - ++it) { - if (it->isHidden() || it->getSlaveDelay() > Seconds{0}) { - continue; - } - - if (it->isElectable()) { - response->addHost(it->getHostAndPort()); - } else if (it->isArbiter()) { - response->addArbiter(it->getHostAndPort()); - } else { - response->addPassive(it->getHostAndPort()); - } - } - - response->setReplSetName(_rsConfig.getReplSetName()); - if (myState.removed()) { - response->markAsNoConfig(); - return; - } - - response->setReplSetVersion(_rsConfig.getConfigVersion()); - response->setIsMaster(myState.primary()); - response->setIsSecondary(myState.secondary()); - - const MemberConfig* curPrimary = _currentPrimaryMember(); - if (curPrimary) { - response->setPrimary(curPrimary->getHostAndPort()); - } - - const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); - if (selfConfig.isArbiter()) { - response->setIsArbiterOnly(true); - } else if (selfConfig.getPriority() == 0) { - response->setIsPassive(true); - } - if (selfConfig.getSlaveDelay() > Seconds(0)) { - response->setSlaveDelay(selfConfig.getSlaveDelay()); - } - if (selfConfig.isHidden()) { - response->setIsHidden(true); - } - if (!selfConfig.shouldBuildIndexes()) { - response->setShouldBuildIndexes(false); - } - const ReplSetTagConfig tagConfig = _rsConfig.getTagConfig(); - if (selfConfig.hasTags(tagConfig)) { - for (MemberConfig::TagIterator tag = selfConfig.tagsBegin(); tag != selfConfig.tagsEnd(); - ++tag) { - std::string tagKey = tagConfig.getTagKey(*tag); - if (tagKey[0] == '$') { - // Filter out internal tags - continue; - } - response->addTag(tagKey, tagConfig.getTagValue(*tag)); - } - } - response->setMe(selfConfig.getHostAndPort()); - if (_iAmPrimary()) { - response->setElectionId(_electionId); - } -} - -StatusWith<TopologyCoordinatorImpl::PrepareFreezeResponseResult> -TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) { - if (_role != TopologyCoordinator::Role::follower) { - std::string msg = str::stream() - << "cannot freeze node when primary or running for election. state: " - << (_role == TopologyCoordinator::Role::leader ? "Primary" : "Running-Election"); - log() << msg; - return Status(ErrorCodes::NotSecondary, msg); - } - - if (secs == 0) { - _stepDownUntil = now; - log() << "'unfreezing'"; - response->append("info", "unfreezing"); - - if (_isElectableNodeInSingleNodeReplicaSet()) { - // If we are a one-node replica set, we're the one member, - // we're electable, we're not in maintenance mode, and we are currently in followerMode - // SECONDARY, we must transition to candidate now that our stepdown period - // is no longer active, in leiu of heartbeats. - _role = Role::candidate; - return PrepareFreezeResponseResult::kElectSelf; - } - } else { - if (secs == 1) - response->append("warning", "you really want to freeze for only 1 second?"); - - _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs)); - log() << "'freezing' for " << secs << " seconds"; - } - - return PrepareFreezeResponseResult::kNoAction; -} - -bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) { - if (_stepDownUntil > now) { - return false; - } - - if (_isElectableNodeInSingleNodeReplicaSet()) { - // If the new config describes a one-node replica set, we're the one member, - // we're electable, we're not in maintenance mode, and we are currently in followerMode - // SECONDARY, we must transition to candidate, in leiu of heartbeats. - _role = Role::candidate; - return true; - } - return false; -} - -void TopologyCoordinatorImpl::setElectionSleepUntil(Date_t newTime) { - if (_electionSleepUntil < newTime) { - _electionSleepUntil = newTime; - } -} - -Timestamp TopologyCoordinatorImpl::getElectionTime() const { - return _electionTime; -} - -OID TopologyCoordinatorImpl::getElectionId() const { - return _electionId; -} - -int TopologyCoordinatorImpl::getCurrentPrimaryIndex() const { - return _currentPrimaryIndex; -} - -Date_t TopologyCoordinatorImpl::getStepDownTime() const { - return _stepDownUntil; -} - -void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig, - int selfIndex, - Date_t now) { - std::vector<MemberData> oldHeartbeats; - _memberData.swap(oldHeartbeats); - - int index = 0; - for (ReplSetConfig::MemberIterator it = newConfig.membersBegin(); it != newConfig.membersEnd(); - ++it, ++index) { - const MemberConfig& newMemberConfig = *it; - MemberData newHeartbeatData; - for (auto&& oldMemberData : oldHeartbeats) { - if ((oldMemberData.getMemberId() == newMemberConfig.getId() && - oldMemberData.getHostAndPort() == newMemberConfig.getHostAndPort()) || - (index == selfIndex && oldMemberData.isSelf())) { - // This member existed in the old config with the same member ID and - // HostAndPort, so copy its heartbeat data over. - newHeartbeatData = oldMemberData; - break; - } - } - newHeartbeatData.setConfigIndex(index); - newHeartbeatData.setIsSelf(index == selfIndex); - newHeartbeatData.setHostAndPort(newMemberConfig.getHostAndPort()); - newHeartbeatData.setMemberId(newMemberConfig.getId()); - _memberData.push_back(newHeartbeatData); - } - if (selfIndex < 0) { - // It's necessary to have self member data even if self isn't in the configuration. - // We don't need data for the other nodes (which no longer know about us, or soon won't) - _memberData.clear(); - // We're not in the config, we can't sync any more. - _syncSource = HostAndPort(); - MemberData newHeartbeatData; - for (auto&& oldMemberData : oldHeartbeats) { - if (oldMemberData.isSelf()) { - newHeartbeatData = oldMemberData; - break; - } - } - newHeartbeatData.setConfigIndex(-1); - newHeartbeatData.setIsSelf(true); - _memberData.push_back(newHeartbeatData); - } -} - -// This function installs a new config object and recreates MemberData objects -// that reflect the new config. -void TopologyCoordinatorImpl::updateConfig(const ReplSetConfig& newConfig, - int selfIndex, - Date_t now) { - invariant(_role != Role::candidate); - invariant(selfIndex < newConfig.getNumMembers()); - - // Reset term on startup and upgrade/downgrade of protocol version. - if (!_rsConfig.isInitialized() || - _rsConfig.getProtocolVersion() != newConfig.getProtocolVersion()) { - if (newConfig.getProtocolVersion() == 1) { - _term = OpTime::kInitialTerm; - } else { - invariant(newConfig.getProtocolVersion() == 0); - _term = OpTime::kUninitializedTerm; - } - LOG(1) << "Updated term in topology coordinator to " << _term << " due to new config"; - } - - _updateHeartbeatDataForReconfig(newConfig, selfIndex, now); - _rsConfig = newConfig; - _selfIndex = selfIndex; - _forceSyncSourceIndex = -1; - - if (_role == Role::leader) { - if (_selfIndex == -1) { - log() << "Could not remain primary because no longer a member of the replica set"; - } else if (!_selfConfig().isElectable()) { - log() << " Could not remain primary because no longer electable"; - } else { - // Don't stepdown if you don't have to. - _currentPrimaryIndex = _selfIndex; - return; - } - _role = Role::follower; - _setLeaderMode(LeaderMode::kNotLeader); - } - - // By this point we know we are in Role::follower - _currentPrimaryIndex = -1; // force secondaries to re-detect who the primary is - - if (_isElectableNodeInSingleNodeReplicaSet()) { - // If the new config describes a one-node replica set, we're the one member, - // we're electable, we're not in maintenance mode and we are currently in followerMode - // SECONDARY, we must transition to candidate, in leiu of heartbeats. - _role = Role::candidate; - } -} -std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const { - // ignore messages over 2 minutes old - if ((now - _hbmsgTime) > Seconds{120}) { - return ""; - } - return _hbmsg; -} - -void TopologyCoordinatorImpl::setMyHeartbeatMessage(const Date_t now, const std::string& message) { - _hbmsgTime = now; - _hbmsg = message; -} - -const MemberConfig& TopologyCoordinatorImpl::_selfConfig() const { - return _rsConfig.getMemberAt(_selfIndex); -} - -const MemberData& TopologyCoordinatorImpl::_selfMemberData() const { - return _memberData[_selfMemberDataIndex()]; -} - -const int TopologyCoordinatorImpl::_selfMemberDataIndex() const { - invariant(!_memberData.empty()); - if (_selfIndex >= 0) - return _selfIndex; - // In master-slave mode, the first entry is for self. If there is no config - // or we're not in the config, the first-and-only entry should be for self. - return 0; -} - -TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnelectableReason( - int index) const { - invariant(index != _selfIndex); - const MemberConfig& memberConfig = _rsConfig.getMemberAt(index); - const MemberData& hbData = _memberData.at(index); - UnelectableReasonMask result = None; - if (memberConfig.isArbiter()) { - result |= ArbiterIAm; - } - if (memberConfig.getPriority() <= 0) { - result |= NoPriority; - } - if (hbData.getState() != MemberState::RS_SECONDARY) { - result |= NotSecondary; - } - if (_rsConfig.getProtocolVersion() == 0 && - !_isOpTimeCloseEnoughToLatestToElect(hbData.getHeartbeatAppliedOpTime())) { - result |= NotCloseEnoughToLatestOptime; - } - if (hbData.up() && hbData.isUnelectable()) { - result |= RefusesToStand; - } - invariant(result || memberConfig.isElectable()); - return result; -} - -TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUnelectableReason( - const Date_t now, StartElectionReason reason) const { - UnelectableReasonMask result = None; - const OpTime lastApplied = getMyLastAppliedOpTime(); - if (lastApplied.isNull()) { - result |= NoData; - } - if (!_aMajoritySeemsToBeUp()) { - result |= CannotSeeMajority; - } - if (_selfIndex == -1) { - result |= NotInitialized; - return result; - } - if (_selfConfig().isArbiter()) { - result |= ArbiterIAm; - } - if (_selfConfig().getPriority() <= 0) { - result |= NoPriority; - } - if (_stepDownUntil > now) { - result |= StepDownPeriodActive; - } - - // Cannot be electable unless secondary or already primary - if (!getMemberState().secondary() && !_iAmPrimary()) { - result |= NotSecondary; - } - - if (_rsConfig.getProtocolVersion() == 0) { - // Election rules only for protocol version 0. - if (_voteLease.whoId != -1 && - _voteLease.whoId != _rsConfig.getMemberAt(_selfIndex).getId() && - _voteLease.when + VoteLease::leaseTime >= now) { - result |= VotedTooRecently; - } - if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied)) { - result |= NotCloseEnoughToLatestOptime; - } - } else { - // Election rules only for protocol version 1. - invariant(_rsConfig.getProtocolVersion() == 1); - if (reason == StartElectionReason::kPriorityTakeover && - !_amIFreshEnoughForPriorityTakeover()) { - result |= NotCloseEnoughToLatestForPriorityTakeover; - } - - if (reason == StartElectionReason::kCatchupTakeover && - !_amIFreshEnoughForCatchupTakeover()) { - result |= NotFreshEnoughForCatchupTakeover; - } - } - return result; -} - -std::string TopologyCoordinatorImpl::_getUnelectableReasonString( - const UnelectableReasonMask ur) const { - invariant(ur); - str::stream ss; - bool hasWrittenToStream = false; - if (ur & NoData) { - ss << "node has no applied oplog entries"; - hasWrittenToStream = true; - } - if (ur & VotedTooRecently) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "I recently voted for " << _voteLease.whoHostAndPort.toString(); - } - if (ur & CannotSeeMajority) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "I cannot see a majority"; - } - if (ur & ArbiterIAm) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "member is an arbiter"; - } - if (ur & NoPriority) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "member has zero priority"; - } - if (ur & StepDownPeriodActive) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "I am still waiting for stepdown period to end at " - << dateToISOStringLocal(_stepDownUntil); - } - if (ur & NotSecondary) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "member is not currently a secondary"; - } - if (ur & NotCloseEnoughToLatestOptime) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "member is more than 10 seconds behind the most up-to-date member"; - } - if (ur & NotCloseEnoughToLatestForPriorityTakeover) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "member is not caught up enough to the most up-to-date member to call for priority " - "takeover - must be within " - << priorityTakeoverFreshnessWindowSeconds << " seconds"; - } - if (ur & NotFreshEnoughForCatchupTakeover) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "member is either not the most up-to-date member or not ahead of the primary, and " - "therefore cannot call for catchup takeover"; - } - if (ur & NotInitialized) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "node is not a member of a valid replica set configuration"; - } - if (ur & RefusesToStand) { - if (hasWrittenToStream) { - ss << "; "; - } - hasWrittenToStream = true; - ss << "most recent heartbeat indicates node will not stand for election"; - } - if (!hasWrittenToStream) { - severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur); - fassertFailed(26011); - } - ss << " (mask 0x" << integerToHex(ur) << ")"; - return ss; -} - -Milliseconds TopologyCoordinatorImpl::_getPing(const HostAndPort& host) { - return _pings[host].getMillis(); -} - -void TopologyCoordinatorImpl::_setElectionTime(const Timestamp& newElectionTime) { - _electionTime = newElectionTime; -} - -int TopologyCoordinatorImpl::_getTotalPings() { - PingMap::iterator it = _pings.begin(); - PingMap::iterator end = _pings.end(); - int totalPings = 0; - while (it != end) { - totalPings += it->second.getCount(); - it++; - } - return totalPings; -} - -std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const { - std::vector<HostAndPort> upHosts; - for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); - ++it) { - const int itIndex = indexOfIterator(_memberData, it); - if (itIndex == _selfIndex) { - continue; // skip ourselves - } - if (!it->maybeUp()) { - continue; // skip DOWN nodes - } - - upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort()); - } - return upHosts; -} - -bool TopologyCoordinatorImpl::voteForMyself(Date_t now) { - if (_role != Role::candidate) { - return false; - } - int selfId = _selfConfig().getId(); - if ((_voteLease.when + VoteLease::leaseTime >= now) && (_voteLease.whoId != selfId)) { - log() << "not voting yea for " << selfId << " voted for " - << _voteLease.whoHostAndPort.toString() << ' ' - << durationCount<Seconds>(now - _voteLease.when) << " secs ago"; - return false; - } - _voteLease.when = now; - _voteLease.whoId = selfId; - _voteLease.whoHostAndPort = _selfConfig().getHostAndPort(); - return true; -} - -bool TopologyCoordinatorImpl::isSteppingDown() const { - return _leaderMode == LeaderMode::kAttemptingStepDown || - _leaderMode == LeaderMode::kSteppingDown; -} - -void TopologyCoordinatorImpl::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) { - // Invariants for valid state transitions. - switch (_leaderMode) { - case LeaderMode::kNotLeader: - invariant(newMode == LeaderMode::kLeaderElect); - break; - case LeaderMode::kLeaderElect: - invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case - newMode == LeaderMode::kMaster || - newMode == LeaderMode::kAttemptingStepDown || - newMode == LeaderMode::kSteppingDown); - break; - case LeaderMode::kMaster: - invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case - newMode == LeaderMode::kAttemptingStepDown || - newMode == LeaderMode::kSteppingDown); - break; - case LeaderMode::kAttemptingStepDown: - invariant(newMode == LeaderMode::kNotLeader || newMode == LeaderMode::kMaster || - newMode == LeaderMode::kSteppingDown); - break; - case LeaderMode::kSteppingDown: - invariant(newMode == LeaderMode::kNotLeader); - break; - } - _leaderMode = std::move(newMode); -} - -MemberState TopologyCoordinatorImpl::getMemberState() const { - if (_selfIndex == -1) { - if (_rsConfig.isInitialized()) { - return MemberState::RS_REMOVED; - } - return MemberState::RS_STARTUP; - } - - if (_rsConfig.isConfigServer()) { - if (_options.clusterRole != ClusterRole::ConfigServer) { - return MemberState::RS_REMOVED; - } else { - invariant(_storageEngineSupportsReadCommitted != ReadCommittedSupport::kUnknown); - if (_storageEngineSupportsReadCommitted == ReadCommittedSupport::kNo) { - return MemberState::RS_REMOVED; - } - } - } else { - if (_options.clusterRole == ClusterRole::ConfigServer) { - return MemberState::RS_REMOVED; - } - } - - if (_role == Role::leader) { - invariant(_currentPrimaryIndex == _selfIndex); - invariant(_leaderMode != LeaderMode::kNotLeader); - return MemberState::RS_PRIMARY; - } - const MemberConfig& myConfig = _selfConfig(); - if (myConfig.isArbiter()) { - return MemberState::RS_ARBITER; - } - if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_memberData, _selfIndex))) && - (_followerMode == MemberState::RS_SECONDARY)) { - return MemberState::RS_RECOVERING; - } - return _followerMode; -} - -bool TopologyCoordinatorImpl::canAcceptWrites() const { - return _leaderMode == LeaderMode::kMaster; -} - -void TopologyCoordinatorImpl::setElectionInfo(OID electionId, Timestamp electionOpTime) { - invariant(_role == Role::leader); - _electionTime = electionOpTime; - _electionId = electionId; -} - -void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp electionOpTime) { - invariant(_role == Role::candidate); - invariant(_leaderMode == LeaderMode::kNotLeader); - _role = Role::leader; - _setLeaderMode(LeaderMode::kLeaderElect); - setElectionInfo(electionId, electionOpTime); - _currentPrimaryIndex = _selfIndex; - _syncSource = HostAndPort(); - _forceSyncSourceIndex = -1; - // Prevent last committed optime from updating until we finish draining. - _firstOpTimeOfMyTerm = - OpTime(Timestamp(std::numeric_limits<int>::max(), 0), std::numeric_limits<int>::max()); -} - -void TopologyCoordinatorImpl::processLoseElection() { - invariant(_role == Role::candidate); - invariant(_leaderMode == LeaderMode::kNotLeader); - const HostAndPort syncSourceAddress = getSyncSourceAddress(); - _electionTime = Timestamp(0, 0); - _electionId = OID(); - _role = Role::follower; - - // Clear voteLease time, if we voted for ourselves in this election. - // This will allow us to vote for others. - if (_voteLease.whoId == _selfConfig().getId()) { - _voteLease.when = Date_t(); - } -} - -bool TopologyCoordinatorImpl::attemptStepDown( - long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) { - - if (_role != Role::leader || _leaderMode == LeaderMode::kSteppingDown || _term != termAtStart) { - uasserted(ErrorCodes::PrimarySteppedDown, - "While waiting for secondaries to catch up before stepping down, " - "this node decided to step down for other reasons"); - } - invariant(_leaderMode == LeaderMode::kAttemptingStepDown); - - if (now >= stepDownUntil) { - uasserted(ErrorCodes::ExceededTimeLimit, - "By the time we were ready to step down, we were already past the " - "time we were supposed to step down until"); - } - - if (!_canCompleteStepDownAttempt(now, waitUntil, force)) { - // Stepdown attempt failed. - - // Check waitUntil after at least one stepdown attempt, so that stepdown could succeed even - // if secondaryCatchUpPeriodSecs == 0. - if (now >= waitUntil) { - uasserted(ErrorCodes::ExceededTimeLimit, - str::stream() << "No electable secondaries caught up as of " - << dateToISOStringLocal(now) - << "Please use the replSetStepDown command with the argument " - << "{force: true} to force node to step down."); - } - - // Stepdown attempt failed, but in a way that can be retried - return false; - } - - // Stepdown attempt success! - _stepDownUntil = stepDownUntil; - _stepDownSelfAndReplaceWith(-1); - return true; -} - -bool TopologyCoordinatorImpl::_canCompleteStepDownAttempt(Date_t now, - Date_t waitUntil, - bool force) { - const bool forceNow = force && (now >= waitUntil); - if (forceNow) { - return true; - } - - return isSafeToStepDown(); -} - -bool TopologyCoordinatorImpl::isSafeToStepDown() { - if (!_rsConfig.isInitialized() || _selfIndex < 0) { - return false; - } - - OpTime lastApplied = getMyLastAppliedOpTime(); - - auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName); - invariant(tagStatus.isOK()); - - // Check if a majority of nodes have reached the last applied optime. - if (!haveTaggedNodesReachedOpTime(lastApplied, tagStatus.getValue(), false)) { - return false; - } - - // Now check that we also have at least one caught up node that is electable. - const OpTime lastOpApplied = getMyLastAppliedOpTime(); - for (int memberIndex = 0; memberIndex < _rsConfig.getNumMembers(); memberIndex++) { - // ignore your self - if (memberIndex == _selfIndex) { - continue; - } - UnelectableReasonMask reason = _getUnelectableReason(memberIndex); - if (!reason && _memberData.at(memberIndex).getHeartbeatAppliedOpTime() >= lastOpApplied) { - // Found a caught up and electable node, succeed with step down. - return true; - } - } - - return false; -} - -void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) { - invariant(_role == Role::follower); - switch (newMode) { - case MemberState::RS_RECOVERING: - case MemberState::RS_ROLLBACK: - case MemberState::RS_SECONDARY: - case MemberState::RS_STARTUP2: - _followerMode = newMode; - break; - default: - invariant(false); - } - - if (_followerMode != MemberState::RS_SECONDARY) { - return; - } - - // When a single node replica set transitions to SECONDARY, we must check if we should - // be a candidate here. This is necessary because a single node replica set has no - // heartbeats that would normally change the role to candidate. - - if (_isElectableNodeInSingleNodeReplicaSet()) { - _role = Role::candidate; - } -} - -bool TopologyCoordinatorImpl::_isElectableNodeInSingleNodeReplicaSet() const { - return _followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 && - _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable() && - _maintenanceModeCalls == 0; -} - -void TopologyCoordinatorImpl::finishUnconditionalStepDown() { - invariant(_leaderMode == LeaderMode::kSteppingDown); - - int remotePrimaryIndex = -1; - for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); - ++it) { - const int itIndex = indexOfIterator(_memberData, it); - if (itIndex == _selfIndex) { - continue; - } - - if (it->getState().primary() && it->up()) { - if (remotePrimaryIndex != -1) { - // two other nodes think they are primary (asynchronously polled) - // -- wait for things to settle down. - remotePrimaryIndex = -1; - warning() << "two remote primaries (transiently)"; - break; - } - remotePrimaryIndex = itIndex; - } - } - _stepDownSelfAndReplaceWith(remotePrimaryIndex); -} - -void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) { - invariant(_role == Role::leader); - invariant(_selfIndex != -1); - invariant(_selfIndex != newPrimary); - invariant(_selfIndex == _currentPrimaryIndex); - _currentPrimaryIndex = newPrimary; - _role = Role::follower; - _setLeaderMode(LeaderMode::kNotLeader); -} - -bool TopologyCoordinatorImpl::updateLastCommittedOpTime() { - // If we're not primary or we're stepping down due to learning of a new term then we must not - // advance the commit point. If we are stepping down due to a user request, however, then it - // is safe to advance the commit point, and in fact we must since the stepdown request may be - // waiting for the commit point to advance enough to be able to safely complete the step down. - if (!_iAmPrimary() || _leaderMode == LeaderMode::kSteppingDown) { - return false; - } - - // Whether we use the applied or durable OpTime for the commit point is decided here. - const bool useDurableOpTime = _rsConfig.getWriteConcernMajorityShouldJournal(); - - std::vector<OpTime> votingNodesOpTimes; - for (const auto& memberData : _memberData) { - int memberIndex = memberData.getConfigIndex(); - invariant(memberIndex >= 0); - const auto& memberConfig = _rsConfig.getMemberAt(memberIndex); - if (memberConfig.isVoter()) { - const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime() - : memberData.getLastAppliedOpTime(); - votingNodesOpTimes.push_back(opTime); - } - } - - invariant(votingNodesOpTimes.size() > 0); - if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) { - return false; - } - std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end()); - - // need the majority to have this OpTime - OpTime committedOpTime = - votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()]; - return advanceLastCommittedOpTime(committedOpTime); -} - -bool TopologyCoordinatorImpl::advanceLastCommittedOpTime(const OpTime& committedOpTime) { - if (committedOpTime == _lastCommittedOpTime) { - return false; // Hasn't changed, so ignore it. - } else if (committedOpTime < _lastCommittedOpTime) { - LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime - << ", currentCommittedOpTime: " << _lastCommittedOpTime; - return false; // This may have come from an out-of-order heartbeat. Ignore it. - } - - // This check is performed to ensure primaries do not commit an OpTime from a previous term. - if (_iAmPrimary() && committedOpTime < _firstOpTimeOfMyTerm) { - LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: " - << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm; - return false; - } - - LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime; - _lastCommittedOpTime = committedOpTime; - return true; -} - -OpTime TopologyCoordinatorImpl::getLastCommittedOpTime() const { - return _lastCommittedOpTime; -} - -bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary( - long long termWhenDrainCompleted) const { - - if (termWhenDrainCompleted != _term) { - return false; - } - // Allow completing the transition to primary even when in the middle of a stepdown attempt, - // in case the stepdown attempt fails. - if (_leaderMode != LeaderMode::kLeaderElect && _leaderMode != LeaderMode::kAttemptingStepDown) { - return false; - } - - return true; -} - -Status TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) { - if (!canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())) { - return Status(ErrorCodes::PrimarySteppedDown, - "By the time this node was ready to complete its transition to PRIMARY it " - "was no longer eligible to do so"); - } - if (_leaderMode == LeaderMode::kLeaderElect) { - _setLeaderMode(LeaderMode::kMaster); - } - _firstOpTimeOfMyTerm = firstOpTimeOfTerm; - return Status::OK(); -} - -void TopologyCoordinatorImpl::adjustMaintenanceCountBy(int inc) { - invariant(_role == Role::follower); - _maintenanceModeCalls += inc; - invariant(_maintenanceModeCalls >= 0); -} - -int TopologyCoordinatorImpl::getMaintenanceCount() const { - return _maintenanceModeCalls; -} - -TopologyCoordinator::UpdateTermResult TopologyCoordinatorImpl::updateTerm(long long term, - Date_t now) { - if (term <= _term) { - return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate; - } - // Don't run election if we just stood up or learned about a new term. - _electionSleepUntil = now + _rsConfig.getElectionTimeoutPeriod(); - - // Don't update the term just yet if we are going to step down, as we don't want to report - // that we are primary in the new term. - if (_iAmPrimary()) { - return TopologyCoordinator::UpdateTermResult::kTriggerStepDown; - } - LOG(1) << "Updating term from " << _term << " to " << term; - _term = term; - return TopologyCoordinator::UpdateTermResult::kUpdatedTerm; -} - - -long long TopologyCoordinatorImpl::getTerm() { - return _term; -} - -// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the -// replset. Passing metadata is unnecessary. -bool TopologyCoordinatorImpl::shouldChangeSyncSource( - const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata, - Date_t now) const { - // Methodology: - // If there exists a viable sync source member other than currentSource, whose oplog has - // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's, - // return true. - // If the currentSource has the same replication progress as we do and has no source for further - // progress, return true. - - if (_selfIndex == -1) { - log() << "Not choosing new sync source because we are not in the config."; - return false; - } - - // If the user requested a sync source change, return true. - if (_forceSyncSourceIndex != -1) { - log() << "Choosing new sync source because the user has requested to use " - << _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort() - << " as a sync source"; - return true; - } - - if (_rsConfig.getProtocolVersion() == 1 && - replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { - log() << "Choosing new sync source because the config version supplied by " << currentSource - << ", " << replMetadata.getConfigVersion() << ", does not match ours, " - << _rsConfig.getConfigVersion(); - return true; - } - - const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource); - // PV0 doesn't use metadata, we have to consult _rsConfig. - if (currentSourceIndex == -1) { - log() << "Choosing new sync source because " << currentSource.toString() - << " is not in our config"; - return true; - } - - invariant(currentSourceIndex != _selfIndex); - - // If OplogQueryMetadata was provided, use its values, otherwise use the ones in - // ReplSetMetadata. - OpTime currentSourceOpTime; - int syncSourceIndex = -1; - int primaryIndex = -1; - if (oqMetadata) { - currentSourceOpTime = - std::max(oqMetadata->getLastOpApplied(), - _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); - syncSourceIndex = oqMetadata->getSyncSourceIndex(); - primaryIndex = oqMetadata->getPrimaryIndex(); - } else { - currentSourceOpTime = - std::max(replMetadata.getLastOpVisible(), - _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); - syncSourceIndex = replMetadata.getSyncSourceIndex(); - primaryIndex = replMetadata.getPrimaryIndex(); - } - - if (currentSourceOpTime.isNull()) { - // Haven't received a heartbeat from the sync source yet, so can't tell if we should - // change. - return false; - } - - // Change sync source if they are not ahead of us, and don't have a sync source, - // unless they are primary. - const OpTime myLastOpTime = getMyLastAppliedOpTime(); - if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 && - currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) { - std::stringstream logMessage; - logMessage << "Choosing new sync source because our current sync source, " - << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime - << ") which is not ahead of ours (" << myLastOpTime - << "), it does not have a sync source, and it's not the primary"; - if (primaryIndex >= 0) { - logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)"; - } else { - logMessage << " (sync source does not know the primary)"; - } - log() << logMessage.str(); - return true; - } - - if (MONGO_FAIL_POINT(disableMaxSyncSourceLagSecs)) { - log() << "disableMaxSyncSourceLagSecs fail point enabled - not checking the most recent " - "OpTime, " - << currentSourceOpTime.toString() << ", of our current sync source, " << currentSource - << ", against the OpTimes of the other nodes in this replica set."; - } else { - unsigned int currentSecs = currentSourceOpTime.getSecs(); - unsigned int goalSecs = currentSecs + durationCount<Seconds>(_options.maxSyncSourceLagSecs); - - for (std::vector<MemberData>::const_iterator it = _memberData.begin(); - it != _memberData.end(); - ++it) { - const int itIndex = indexOfIterator(_memberData, it); - const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex); - if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) && - (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) && - it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) && - goalSecs < it->getHeartbeatAppliedOpTime().getSecs()) { - log() << "Choosing new sync source because the most recent OpTime of our sync " - "source, " - << currentSource << ", is " << currentSourceOpTime.toString() - << " which is more than " << _options.maxSyncSourceLagSecs - << " behind member " << candidateConfig.getHostAndPort().toString() - << " whose most recent OpTime is " - << it->getHeartbeatAppliedOpTime().toString(); - invariant(itIndex != _selfIndex); - return true; - } - } - } - - return false; -} - -rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata( - const OpTime& lastVisibleOpTime) const { - return rpc::ReplSetMetadata(_term, - _lastCommittedOpTime, - lastVisibleOpTime, - _rsConfig.getConfigVersion(), - _rsConfig.getReplicaSetId(), - _currentPrimaryIndex, - _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); -} - -rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata(int rbid) const { - return rpc::OplogQueryMetadata(_lastCommittedOpTime, - getMyLastAppliedOpTime(), - rbid, - _currentPrimaryIndex, - _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); -} - -void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { - output->setConfig(_rsConfig); - output->setHBData(_memberData); - output->setSelfIndex(_selfIndex); - output->setPrimaryIndex(_currentPrimaryIndex); - output->setSelfState(getMemberState()); - output->setSelfHeartbeatMessage(_hbmsg); -} - -void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response) { - response->setTerm(_term); - - if (args.getTerm() < _term) { - response->setVoteGranted(false); - response->setReason(str::stream() << "candidate's term (" << args.getTerm() - << ") is lower than mine (" - << _term - << ")"); - } else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) { - response->setVoteGranted(false); - response->setReason(str::stream() << "candidate's config version (" - << args.getConfigVersion() - << ") differs from mine (" - << _rsConfig.getConfigVersion() - << ")"); - } else if (args.getSetName() != _rsConfig.getReplSetName()) { - response->setVoteGranted(false); - response->setReason(str::stream() << "candidate's set name (" << args.getSetName() - << ") differs from mine (" - << _rsConfig.getReplSetName() - << ")"); - } else if (args.getLastDurableOpTime() < getMyLastAppliedOpTime()) { - response->setVoteGranted(false); - response - ->setReason(str::stream() - << "candidate's data is staler than mine. candidate's last applied OpTime: " - << args.getLastDurableOpTime().toString() - << ", my last applied OpTime: " - << getMyLastAppliedOpTime().toString()); - } else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) { - response->setVoteGranted(false); - response->setReason(str::stream() - << "already voted for another candidate (" - << _rsConfig.getMemberAt(_lastVote.getCandidateIndex()).getHostAndPort() - << ") this term (" - << _lastVote.getTerm() - << ")"); - } else { - int betterPrimary = _findHealthyPrimaryOfEqualOrGreaterPriority(args.getCandidateIndex()); - if (_selfConfig().isArbiter() && betterPrimary >= 0) { - response->setVoteGranted(false); - response->setReason(str::stream() - << "can see a healthy primary (" - << _rsConfig.getMemberAt(betterPrimary).getHostAndPort() - << ") of equal or greater priority"); - } else { - if (!args.isADryRun()) { - _lastVote.setTerm(args.getTerm()); - _lastVote.setCandidateIndex(args.getCandidateIndex()); - } - response->setVoteGranted(true); - } - } -} - -void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) { - _lastVote = lastVote; -} - -void TopologyCoordinatorImpl::voteForMyselfV1() { - _lastVote.setTerm(_term); - _lastVote.setCandidateIndex(_selfIndex); -} - -void TopologyCoordinatorImpl::setPrimaryIndex(long long primaryIndex) { - _currentPrimaryIndex = primaryIndex; -} - -Status TopologyCoordinatorImpl::becomeCandidateIfElectable(const Date_t now, - StartElectionReason reason) { - if (_role == Role::leader) { - return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"}; - } - - if (_role == Role::candidate) { - return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"}; - } - - const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, reason); - if (unelectableReason) { - return {ErrorCodes::NodeNotElectable, - str::stream() << "Not standing for election because " - << _getUnelectableReasonString(unelectableReason)}; - } - - // All checks passed, become a candidate and start election proceedings. - _role = Role::candidate; - - return Status::OK(); -} - -void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool supported) { - _storageEngineSupportsReadCommitted = - supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; -} - -void TopologyCoordinatorImpl::restartHeartbeats() { - for (auto& hb : _memberData) { - hb.restart(); - } -} - -boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const { - // The smallest OpTime in PV1. - OpTime latest(Timestamp(0, 0), 0); - for (size_t i = 0; i < _memberData.size(); i++) { - auto& peer = _memberData[i]; - - if (static_cast<int>(i) == _selfIndex) { - continue; - } - // If any heartbeat is not fresh enough, return none. - if (!peer.isUpdatedSinceRestart()) { - return boost::none; - } - // Ignore down members - if (!peer.up()) { - continue; - } - if (peer.getHeartbeatAppliedOpTime() > latest) { - latest = peer.getHeartbeatAppliedOpTime(); - } - } - return latest; -} - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h deleted file mode 100644 index 046ff2217a6..00000000000 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ /dev/null @@ -1,529 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <string> -#include <vector> - -#include "mongo/bson/timestamp.h" -#include "mongo/db/repl/last_vote.h" -#include "mongo/db/repl/member_data.h" -#include "mongo/db/repl/member_state.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/repl/repl_set_config.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/topology_coordinator.h" -#include "mongo/db/server_options.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -class OperationContext; - -namespace rpc { -class ReplSetMetadata; -} // namespace rpc - -namespace repl { - -static const Milliseconds UninitializedPing{-1}; - -/** - * Represents a latency measurement for each replica set member based on heartbeat requests. - * The measurement is an average weighted 80% to the old value, and 20% to the new value. - * - * Also stores information about heartbeat progress and retries. - */ -class PingStats { -public: - /** - * Records that a new heartbeat request started at "now". - * - * This resets the failure count used in determining whether the next request to a target - * should be a retry or a regularly scheduled heartbeat message. - */ - void start(Date_t now); - - /** - * Records that a heartbeat request completed successfully, and that "millis" milliseconds - * were spent for a single network roundtrip plus remote processing time. - */ - void hit(Milliseconds millis); - - /** - * Records that a heartbeat request failed. - */ - void miss(); - - /** - * Gets the number of hit() calls. - */ - unsigned int getCount() const { - return count; - } - - /** - * Gets the weighted average round trip time for heartbeat messages to the target. - * Returns 0 if there have been no pings recorded yet. - */ - Milliseconds getMillis() const { - return value == UninitializedPing ? Milliseconds(0) : value; - } - - /** - * Gets the date at which start() was last called, which is used to determine if - * a heartbeat should be retried or if the time limit has expired. - */ - Date_t getLastHeartbeatStartDate() const { - return _lastHeartbeatStartDate; - } - - /** - * Gets the number of failures since start() was last called. - * - * This value is incremented by calls to miss(), cleared by calls to start() and - * set to the maximum possible value by calls to hit(). - */ - int getNumFailuresSinceLastStart() const { - return _numFailuresSinceLastStart; - } - -private: - unsigned int count = 0; - Milliseconds value = UninitializedPing; - Date_t _lastHeartbeatStartDate; - int _numFailuresSinceLastStart = std::numeric_limits<int>::max(); -}; - -class TopologyCoordinatorImpl : public TopologyCoordinator { -public: - struct Options { - // A sync source is re-evaluated after it lags behind further than this amount. - Seconds maxSyncSourceLagSecs{0}; - - // Whether or not this node is running as a config server. - ClusterRole clusterRole{ClusterRole::None}; - }; - - /** - * Constructs a Topology Coordinator object. - **/ - TopologyCoordinatorImpl(Options options); - - //////////////////////////////////////////////////////////// - // - // Implementation of TopologyCoordinator interface - // - //////////////////////////////////////////////////////////// - - virtual Role getRole() const; - virtual MemberState getMemberState() const; - virtual bool canAcceptWrites() const override; - virtual bool isSteppingDown() const override; - virtual HostAndPort getSyncSourceAddress() const; - virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const; - virtual int getMaintenanceCount() const; - virtual long long getTerm(); - virtual UpdateTermResult updateTerm(long long term, Date_t now); - virtual void setForceSyncSourceIndex(int index); - virtual HostAndPort chooseNewSyncSource(Date_t now, - const OpTime& lastOpTimeFetched, - ChainingPreference chainingPreference) override; - virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); - virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now); - virtual void clearSyncSourceBlacklist(); - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata, - Date_t now) const; - virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); - virtual void setElectionSleepUntil(Date_t newTime); - virtual void setFollowerMode(MemberState::MS newMode); - virtual bool updateLastCommittedOpTime(); - virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime); - virtual OpTime getLastCommittedOpTime() const; - virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override; - virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) override; - virtual void adjustMaintenanceCountBy(int inc); - virtual void prepareSyncFromResponse(const HostAndPort& target, - BSONObjBuilder* response, - Status* result); - virtual void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args, - Date_t now, - BSONObjBuilder* response, - Status* result); - virtual void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args, - Date_t now, - BSONObjBuilder* response, - Status* result); - virtual Status prepareHeartbeatResponse(Date_t now, - const ReplSetHeartbeatArgs& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response); - virtual Status prepareHeartbeatResponseV1(Date_t now, - const ReplSetHeartbeatArgsV1& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response); - virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, - BSONObjBuilder* response, - Status* result); - virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand( - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle, - OpTime currentCommittedSnapshotOpTime) const; - - virtual void fillIsMasterForReplSet(IsMasterResponse* response); - virtual void fillMemberData(BSONObjBuilder* result); - virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now, - int secs, - BSONObjBuilder* response); - virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now); - virtual std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest( - Date_t now, const std::string& ourSetName, const HostAndPort& target); - virtual std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1( - Date_t now, const std::string& ourSetName, const HostAndPort& target); - virtual HeartbeatResponseAction processHeartbeatResponse( - Date_t now, - Milliseconds networkRoundTripTime, - const HostAndPort& target, - const StatusWith<ReplSetHeartbeatResponse>& hbResponse); - virtual bool voteForMyself(Date_t now); - virtual void setElectionInfo(OID electionId, Timestamp electionOpTime); - virtual void processWinElection(OID electionId, Timestamp electionOpTime); - virtual void processLoseElection(); - virtual Status checkShouldStandForElection(Date_t now) const; - virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message); - virtual bool attemptStepDown(long long termAtStart, - Date_t now, - Date_t waitUntil, - Date_t stepDownUntil, - bool force) override; - virtual bool isSafeToStepDown() override; - virtual bool prepareForUnconditionalStepDown() override; - virtual Status prepareForStepDownAttempt() override; - virtual void abortAttemptedStepDownIfNeeded() override; - virtual void finishUnconditionalStepDown() override; - virtual Date_t getStepDownTime() const; - virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const; - virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const; - virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response); - virtual void summarizeAsHtml(ReplSetHtmlSummary* output); - virtual void loadLastVote(const LastVote& lastVote); - virtual void voteForMyselfV1(); - virtual void setPrimaryIndex(long long primaryIndex); - virtual int getCurrentPrimaryIndex() const; - virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten); - virtual bool haveTaggedNodesReachedOpTime(const OpTime& opTime, - const ReplSetTagPattern& tagPattern, - bool durablyWritten); - virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, - bool durablyWritten, - bool skipSelf); - virtual bool setMemberAsDown(Date_t now, const int memberIndex) override; - virtual std::pair<int, Date_t> getStalestLiveMember() const; - virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now); - virtual void resetAllMemberTimeouts(Date_t now); - virtual void resetMemberTimeouts(Date_t now, - const stdx::unordered_set<HostAndPort>& member_set); - virtual OpTime getMyLastAppliedOpTime() const; - virtual OpTime getMyLastDurableOpTime() const; - virtual MemberData* getMyMemberData(); - virtual MemberData* findMemberDataByMemberId(const int memberId); - virtual MemberData* findMemberDataByRid(const OID rid); - virtual MemberData* addSlaveMemberData(const OID rid); - virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason); - virtual void setStorageEngineSupportsReadCommitted(bool supported); - - virtual void restartHeartbeats(); - - virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const; - - //////////////////////////////////////////////////////////// - // - // Test support methods - // - //////////////////////////////////////////////////////////// - - // Changes _memberState to newMemberState. Only for testing. - void changeMemberState_forTest(const MemberState& newMemberState, - const Timestamp& electionTime = Timestamp(0, 0)); - - // Sets "_electionTime" to "newElectionTime". Only for testing. - void _setElectionTime(const Timestamp& newElectionTime); - - // Sets _currentPrimaryIndex to the given index. Should only be used in unit tests! - // TODO(spencer): Remove this once we can easily call for an election in unit tests to - // set the current primary. - void _setCurrentPrimaryForTest(int primaryIndex); - - // Returns _electionTime. Only used in unittests. - Timestamp getElectionTime() const; - - // Returns _electionId. Only used in unittests. - OID getElectionId() const; - -private: - enum UnelectableReason { - None = 0, - CannotSeeMajority = 1 << 0, - NotCloseEnoughToLatestOptime = 1 << 1, - ArbiterIAm = 1 << 2, - NotSecondary = 1 << 3, - NoPriority = 1 << 4, - StepDownPeriodActive = 1 << 5, - NoData = 1 << 6, - NotInitialized = 1 << 7, - VotedTooRecently = 1 << 8, - RefusesToStand = 1 << 9, - NotCloseEnoughToLatestForPriorityTakeover = 1 << 10, - NotFreshEnoughForCatchupTakeover = 1 << 11, - }; - typedef int UnelectableReasonMask; - - - // Set what type of PRIMARY this node currently is. - void _setLeaderMode(LeaderMode mode); - - // Returns the number of heartbeat pings which have occurred. - int _getTotalPings(); - - // Returns the current "ping" value for the given member by their address - Milliseconds _getPing(const HostAndPort& host); - - // Determines if we will veto the member specified by "args.id". - // If we veto, the errmsg will be filled in with a reason - bool _shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args, - const Date_t& now, - std::string* errmsg) const; - - // Returns the index of the member with the matching id, or -1 if none match. - int _getMemberIndex(int id) const; - - // Sees if a majority number of votes are held by members who are currently "up" - bool _aMajoritySeemsToBeUp() const; - - // Checks if the node can see a healthy primary of equal or greater priority to the - // candidate. If so, returns the index of that node. Otherwise returns -1. - int _findHealthyPrimaryOfEqualOrGreaterPriority(const int candidateIndex) const; - - // Is otherOpTime close enough (within 10 seconds) to the latest known optime to qualify - // for an election - bool _isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const; - - // Is our optime close enough to the latest known optime to call for a priority takeover. - bool _amIFreshEnoughForPriorityTakeover() const; - - // Is the primary node still in catchup mode and is our optime the latest - // known optime of all the up nodes. - bool _amIFreshEnoughForCatchupTakeover() const; - - // Returns reason why "self" member is unelectable - UnelectableReasonMask _getMyUnelectableReason(const Date_t now, - StartElectionReason reason) const; - - // Returns reason why memberIndex is unelectable - UnelectableReasonMask _getUnelectableReason(int memberIndex) const; - - // Returns the nice text of why the node is unelectable - std::string _getUnelectableReasonString(UnelectableReasonMask ur) const; - - // Return true if we are currently primary - bool _iAmPrimary() const; - - // Scans through all members that are 'up' and return the latest known optime. - OpTime _latestKnownOpTime() const; - - // Scans the electable set and returns the highest priority member index - int _getHighestPriorityElectableIndex(Date_t now) const; - - // Returns true if "one" member is higher priority than "two" member - bool _isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const; - - // Helper shortcut to self config - const MemberConfig& _selfConfig() const; - - // Helper shortcut to self member data - const MemberData& _selfMemberData() const; - - // Index of self member in member data. - const int _selfMemberDataIndex() const; - - // Returns NULL if there is no primary, or the MemberConfig* for the current primary - const MemberConfig* _currentPrimaryMember() const; - - /** - * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an - * election or stepdown should commence. - * _updatePrimaryFromHBDataV1() is a simplified version of _updatePrimaryFromHBData() to be used - * when in ProtocolVersion1. - */ - HeartbeatResponseAction _updatePrimaryFromHBData(int updatedConfigIndex, - const MemberState& originalState, - Date_t now); - HeartbeatResponseAction _updatePrimaryFromHBDataV1(int updatedConfigIndex, - const MemberState& originalState, - Date_t now); - - /** - * Updates _memberData based on the newConfig, ensuring that every member in the newConfig - * has an entry in _memberData. If any nodes in the newConfig are also present in - * _currentConfig, copies their heartbeat info into the corresponding entry in the updated - * _memberData vector. - */ - void _updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now); - - /** - * Returns whether a stepdown attempt should be allowed to proceed. See the comment for - * attemptStepDown() for more details on the rules of when stepdown attempts succeed or fail. - */ - bool _canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force); - - void _stepDownSelfAndReplaceWith(int newPrimary); - - /** - * Looks up the provided member in the blacklist and returns true if the member's blacklist - * expire time is after 'now'. If the member is found but the expire time is before 'now', - * the function returns false. If the member is not found in the blacklist, the function - * returns false. - **/ - bool _memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const; - - /** - * Returns true if we are a one-node replica set, we're the one member, - * we're electable, we're not in maintenance mode, and we are currently in followerMode - * SECONDARY. - * - * This is used to decide if we should transition to Role::candidate in a one-node replica set. - */ - bool _isElectableNodeInSingleNodeReplicaSet() const; - - // This node's role in the replication protocol. - Role _role; - - // This is a unique id that is generated and set each time we transition to PRIMARY, as the - // result of an election. - OID _electionId; - // The time at which the current PRIMARY was elected. - Timestamp _electionTime; - - // This node's election term. The term is used as part of the consensus algorithm to elect - // and maintain one primary (leader) node in the cluster. - long long _term; - - // the index of the member we currently believe is primary, if one exists, otherwise -1 - int _currentPrimaryIndex; - - // the hostandport we are currently syncing from - // empty if no sync source (we are primary, or we cannot connect to anyone yet) - HostAndPort _syncSource; - // These members are not chosen as sync sources for a period of time, due to connection - // issues with them - std::map<HostAndPort, Date_t> _syncSourceBlacklist; - // The next sync source to be chosen, requested via a replSetSyncFrom command - int _forceSyncSourceIndex; - - // Options for this TopologyCoordinator - Options _options; - - // "heartbeat message" - // sent in requestHeartbeat respond in field "hbm" - std::string _hbmsg; - Date_t _hbmsgTime; // when it was logged - - // heartbeat msg to send to others; descriptive diagnostic info - std::string _getHbmsg(Date_t now) const; - - int _selfIndex; // this node's index in _members and _currentConfig - - ReplSetConfig _rsConfig; // The current config, including a vector of MemberConfigs - - // Heartbeat, current applied/durable optime, and other state data for each member. It is - // guaranteed that this vector will be maintained in the same order as the MemberConfigs in - // _currentConfig, therefore the member config index can be used to index into this vector as - // well. - std::vector<MemberData> _memberData; - - // Time when stepDown command expires - Date_t _stepDownUntil; - - // A time before which this node will not stand for election. - // In protocol version 1, this is used to prevent running for election after seeing - // a new term. - Date_t _electionSleepUntil; - - // OpTime of the latest committed operation. - OpTime _lastCommittedOpTime; - - // OpTime representing our transition to PRIMARY and the start of our term. - // _lastCommittedOpTime cannot be set to an earlier OpTime. - OpTime _firstOpTimeOfMyTerm; - - // The number of calls we have had to enter maintenance mode - int _maintenanceModeCalls; - - // The sub-mode of follower that we are in. Legal values are RS_SECONDARY, RS_RECOVERING, - // RS_STARTUP2 (initial sync) and RS_ROLLBACK. Only meaningful if _role == Role::follower. - // Configured via setFollowerMode(). If the sub-mode is RS_SECONDARY, then the effective - // sub-mode is either RS_SECONDARY or RS_RECOVERING, depending on _maintenanceModeCalls. - // Rather than accesing this variable direclty, one should use the getMemberState() method, - // which computes the replica set node state on the fly. - MemberState::MS _followerMode; - - // What type of PRIMARY this node currently is. Don't set this directly, call _setLeaderMode - // instead. - LeaderMode _leaderMode = LeaderMode::kNotLeader; - - typedef std::map<HostAndPort, PingStats> PingMap; - // Ping stats for each member by HostAndPort; - PingMap _pings; - - // Last vote info from the election - struct VoteLease { - static const Seconds leaseTime; - - Date_t when; - int whoId = -1; - HostAndPort whoHostAndPort; - } _voteLease; - - // V1 last vote info for elections - LastVote _lastVote{OpTime::kInitialTerm, -1}; - - enum class ReadCommittedSupport { - kUnknown, - kNo, - kYes, - }; - - // Whether or not the storage engine supports read committed. - ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown}; -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_test.cpp index a22aec9f72f..a22aec9f72f 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_test.cpp diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 048bb17b573..048bb17b573 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp |