summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp3216
-rw-r--r--src/mongo/db/repl/topology_coordinator.h977
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp3239
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h529
-rw-r--r--src/mongo/db/repl/topology_coordinator_test.cpp (renamed from src/mongo/db/repl/topology_coordinator_impl_test.cpp)0
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp (renamed from src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp)0
6 files changed, 3557 insertions, 4404 deletions
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index e79bcad827f..84f4ccbedfe 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -30,53 +30,3209 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/repl/topology_coordinator.h"
+#include "mongo/db/repl/topology_coordinator_impl.h"
-#include <string>
+#include <limits>
-#include "mongo/util/assert_util.h"
+#include "mongo/db/audit.h"
+#include "mongo/db/client.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/heartbeat_response_action.h"
+#include "mongo/db/repl/is_master_response.h"
+#include "mongo/db/repl/isself.h"
+#include "mongo/db/repl/repl_set_heartbeat_args.h"
+#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
+#include "mongo/db/repl/repl_set_heartbeat_response.h"
+#include "mongo/db/repl/repl_set_html_summary.h"
+#include "mongo/db/repl/repl_set_request_votes_args.h"
+#include "mongo/db/repl/rslog.h"
+#include "mongo/db/repl/update_position_args.h"
+#include "mongo/db/server_parameters.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/hex.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
+using std::vector;
+const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30);
+
+// Controls how caught up in replication a secondary with higher priority than the current primary
+// must be before it will call for a priority takeover election.
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(priorityTakeoverFreshnessWindowSeconds, int, 2);
+
+// If this fail point is enabled, TopologyCoordinatorImpl::shouldChangeSyncSource() will ignore
+// the option TopologyCoordinatorImpl::Options::maxSyncSourceLagSecs. The sync source will not be
+// re-evaluated if it lags behind another node by more than 'maxSyncSourceLagSecs' seconds.
+MONGO_FP_DECLARE(disableMaxSyncSourceLagSecs);
+
namespace {
-static const int kLeaderValue = 0;
-static const int kFollowerValue = 1;
-static const int kCandidateValue = 2;
+
+template <typename T>
+int indexOfIterator(const std::vector<T>& vec, typename std::vector<T>::const_iterator& it) {
+ return static_cast<int>(it - vec.begin());
+}
+
+// Maximum number of retries for a failed heartbeat.
+const int kMaxHeartbeatRetries = 2;
+
+/**
+ * Returns true if the only up heartbeats are auth errors.
+ */
+bool _hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberData>& hbdata, const int selfIndex) {
+ bool foundAuthError = false;
+ for (std::vector<MemberData>::const_iterator it = hbdata.begin(); it != hbdata.end(); ++it) {
+ if (indexOfIterator(hbdata, it) == selfIndex) {
+ continue;
+ }
+
+ if (it->up()) {
+ return false;
+ }
+
+ if (it->hasAuthIssue()) {
+ foundAuthError = true;
+ }
+ }
+
+ return foundAuthError;
+}
+
+void appendOpTime(BSONObjBuilder* bob,
+ const char* elemName,
+ const OpTime& opTime,
+ const long long pv) {
+ if (pv == 1) {
+ opTime.append(bob, elemName);
+ } else {
+ bob->append(elemName, opTime.getTimestamp());
+ }
+}
} // namespace
-const TopologyCoordinator::Role TopologyCoordinator::Role::leader(kLeaderValue);
-const TopologyCoordinator::Role TopologyCoordinator::Role::follower(kFollowerValue);
-const TopologyCoordinator::Role TopologyCoordinator::Role::candidate(kCandidateValue);
+void PingStats::start(Date_t now) {
+ _lastHeartbeatStartDate = now;
+ _numFailuresSinceLastStart = 0;
+}
+
+void PingStats::hit(Milliseconds millis) {
+ _numFailuresSinceLastStart = std::numeric_limits<int>::max();
+ ++count;
+
+ value = value == UninitializedPing ? millis : Milliseconds((value * 4 + millis) / 5);
+}
+
+void PingStats::miss() {
+ ++_numFailuresSinceLastStart;
+}
+
+TopologyCoordinatorImpl::TopologyCoordinatorImpl(Options options)
+ : _role(Role::follower),
+ _term(OpTime::kUninitializedTerm),
+ _currentPrimaryIndex(-1),
+ _forceSyncSourceIndex(-1),
+ _options(std::move(options)),
+ _selfIndex(-1),
+ _maintenanceModeCalls(0),
+ _followerMode(MemberState::RS_STARTUP2) {
+ invariant(getMemberState() == MemberState::RS_STARTUP);
+ // Need an entry for self in the memberHearbeatData.
+ _memberData.emplace_back();
+ _memberData.back().setIsSelf(true);
+}
+
+TopologyCoordinator::Role TopologyCoordinatorImpl::getRole() const {
+ return _role;
+}
+
+void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) {
+ invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
+ _forceSyncSourceIndex = index;
+}
+
+HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
+ return _syncSource;
+}
+
+HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
+ const OpTime& lastOpTimeFetched,
+ ChainingPreference chainingPreference) {
+ // If we are not a member of the current replica set configuration, no sync source is valid.
+ if (_selfIndex == -1) {
+ LOG(1) << "Cannot sync from any members because we are not in the replica set config";
+ return HostAndPort();
+ }
+
+ // if we have a target we've requested to sync from, use it
+ if (_forceSyncSourceIndex != -1) {
+ invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
+ _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
+ _forceSyncSourceIndex = -1;
+ log() << "choosing sync source candidate by request: " << _syncSource;
+ std::string msg(str::stream() << "syncing from: " << _syncSource.toString()
+ << " by request");
+ setMyHeartbeatMessage(now, msg);
+ return _syncSource;
+ }
+
+ // wait for 2N pings (not counting ourselves) before choosing a sync target
+ int needMorePings = (_memberData.size() - 1) * 2 - _getTotalPings();
+
+ if (needMorePings > 0) {
+ OCCASIONALLY log() << "waiting for " << needMorePings
+ << " pings from other members before syncing";
+ _syncSource = HostAndPort();
+ return _syncSource;
+ }
+
+ // If we are only allowed to sync from the primary, set that
+ if (chainingPreference == ChainingPreference::kUseConfiguration &&
+ !_rsConfig.isChainingAllowed()) {
+ if (_currentPrimaryIndex == -1) {
+ LOG(1) << "Cannot select a sync source because chaining is"
+ " not allowed and primary is unknown/down";
+ _syncSource = HostAndPort();
+ return _syncSource;
+ } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) {
+ LOG(1) << "Cannot select a sync source because chaining is not allowed and primary "
+ "member is blacklisted: "
+ << _currentPrimaryMember()->getHostAndPort();
+ _syncSource = HostAndPort();
+ return _syncSource;
+ } else if (_currentPrimaryIndex == _selfIndex) {
+ LOG(1)
+ << "Cannot select a sync source because chaining is not allowed and we are primary";
+ _syncSource = HostAndPort();
+ return _syncSource;
+ } else {
+ _syncSource = _currentPrimaryMember()->getHostAndPort();
+ log() << "chaining not allowed, choosing primary as sync source candidate: "
+ << _syncSource;
+ std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString());
+ setMyHeartbeatMessage(now, msg);
+ return _syncSource;
+ }
+ }
+
+ // find the member with the lowest ping time that is ahead of me
+
+ // choose a time that will exclude no candidates by default, in case we don't see a primary
+ OpTime oldestSyncOpTime;
+
+ // Find primary's oplog time. Reject sync candidates that are more than
+ // _options.maxSyncSourceLagSecs seconds behind.
+ if (_currentPrimaryIndex != -1) {
+ OpTime primaryOpTime = _memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime();
+
+ // Check if primaryOpTime is still close to 0 because we haven't received
+ // our first heartbeat from a new primary yet.
+ unsigned int maxLag =
+ static_cast<unsigned int>(durationCount<Seconds>(_options.maxSyncSourceLagSecs));
+ if (primaryOpTime.getSecs() >= maxLag) {
+ oldestSyncOpTime =
+ OpTime(Timestamp(primaryOpTime.getSecs() - maxLag, 0), primaryOpTime.getTerm());
+ }
+ }
+
+ int closestIndex = -1;
+
+ // Make two attempts, with less restrictive rules the second time.
+ //
+ // During the first attempt, we ignore those nodes that have a larger slave
+ // delay, hidden nodes or non-voting, and nodes that are excessively behind.
+ //
+ // For the second attempt include those nodes, in case those are the only ones we can reach.
+ //
+ // This loop attempts to set 'closestIndex', to select a viable candidate.
+ for (int attempts = 0; attempts < 2; ++attempts) {
+ for (std::vector<MemberData>::const_iterator it = _memberData.begin();
+ it != _memberData.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_memberData, it);
+ // Don't consider ourselves.
+ if (itIndex == _selfIndex) {
+ continue;
+ }
+
+ const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex));
+
+ // Candidate must be up to be considered.
+ if (!it->up()) {
+ LOG(2) << "Cannot select sync source because it is not up: "
+ << itMemberConfig.getHostAndPort();
+ continue;
+ }
+ // Candidate must be PRIMARY or SECONDARY state to be considered.
+ if (!it->getState().readable()) {
+ LOG(2) << "Cannot select sync source because it is not readable: "
+ << itMemberConfig.getHostAndPort();
+ continue;
+ }
+
+ // On the first attempt, we skip candidates that do not match these criteria.
+ if (attempts == 0) {
+ // Candidate must be a voter if we are a voter.
+ if (_selfConfig().isVoter() && !itMemberConfig.isVoter()) {
+ LOG(2) << "Cannot select sync source because we are a voter and it is not: "
+ << itMemberConfig.getHostAndPort();
+ continue;
+ }
+ // Candidates must not be hidden.
+ if (itMemberConfig.isHidden()) {
+ LOG(2) << "Cannot select sync source because it is hidden: "
+ << itMemberConfig.getHostAndPort();
+ continue;
+ }
+ // Candidates cannot be excessively behind.
+ if (it->getHeartbeatAppliedOpTime() < oldestSyncOpTime) {
+ LOG(2) << "Cannot select sync source because it is too far behind."
+ << "Latest optime of sync candidate " << itMemberConfig.getHostAndPort()
+ << ": " << it->getHeartbeatAppliedOpTime()
+ << ", oldest acceptable optime: " << oldestSyncOpTime;
+ continue;
+ }
+ // Candidate must not have a configured delay larger than ours.
+ if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay()) {
+ LOG(2) << "Cannot select sync source with larger slaveDelay than ours: "
+ << itMemberConfig.getHostAndPort();
+ continue;
+ }
+ }
+ // Candidate must build indexes if we build indexes, to be considered.
+ if (_selfConfig().shouldBuildIndexes()) {
+ if (!itMemberConfig.shouldBuildIndexes()) {
+ LOG(2) << "Cannot select sync source with shouldBuildIndex differences: "
+ << itMemberConfig.getHostAndPort();
+ continue;
+ }
+ }
+ // only consider candidates that are ahead of where we are
+ if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) {
+ LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. "
+ << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON()
+ << ", latest oplog optime of sync candidate "
+ << itMemberConfig.getHostAndPort() << ": "
+ << it->getHeartbeatAppliedOpTime().toBSON();
+ continue;
+ }
+ // Candidate cannot be more latent than anything we've already considered.
+ if ((closestIndex != -1) &&
+ (_getPing(itMemberConfig.getHostAndPort()) >
+ _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) {
+ LOG(2) << "Cannot select sync source with higher latency than the best candidate: "
+ << itMemberConfig.getHostAndPort();
+
+ continue;
+ }
+ // Candidate cannot be blacklisted.
+ if (_memberIsBlacklisted(itMemberConfig, now)) {
+ LOG(1) << "Cannot select sync source which is blacklisted: "
+ << itMemberConfig.getHostAndPort();
+
+ continue;
+ }
+ // This candidate has passed all tests; set 'closestIndex'
+ closestIndex = itIndex;
+ }
+ if (closestIndex != -1)
+ break; // no need for second attempt
+ }
+
+ if (closestIndex == -1) {
+ // Did not find any members to sync from
+ std::string msg("could not find member to sync from");
+ // Only log when we had a valid sync source before
+ if (!_syncSource.empty()) {
+ log() << msg << rsLog;
+ }
+ setMyHeartbeatMessage(now, msg);
+
+ _syncSource = HostAndPort();
+ return _syncSource;
+ }
+ _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort();
+ log() << "sync source candidate: " << _syncSource;
+ std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0);
+ setMyHeartbeatMessage(now, msg);
+ return _syncSource;
+}
+
+bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberConfig,
+ Date_t now) const {
+ std::map<HostAndPort, Date_t>::const_iterator blacklisted =
+ _syncSourceBlacklist.find(memberConfig.getHostAndPort());
+ if (blacklisted != _syncSourceBlacklist.end()) {
+ if (blacklisted->second > now) {
+ return true;
+ }
+ }
+ return false;
+}
+
+void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
+ LOG(2) << "blacklisting " << host << " until " << until.toString();
+ _syncSourceBlacklist[host] = until;
+}
+
+void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Date_t now) {
+ std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host);
+ if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) {
+ LOG(2) << "unblacklisting " << host;
+ _syncSourceBlacklist.erase(hostItr);
+ }
+}
+
+void TopologyCoordinatorImpl::clearSyncSourceBlacklist() {
+ _syncSourceBlacklist.clear();
+}
+
+void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target,
+ BSONObjBuilder* response,
+ Status* result) {
+ response->append("syncFromRequested", target.toString());
+
+ if (_selfIndex == -1) {
+ *result = Status(ErrorCodes::NotSecondary, "Removed and uninitialized nodes do not sync");
+ return;
+ }
+
+ const MemberConfig& selfConfig = _selfConfig();
+ if (selfConfig.isArbiter()) {
+ *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync");
+ return;
+ }
+ if (_selfIndex == _currentPrimaryIndex) {
+ *result = Status(ErrorCodes::NotSecondary, "primaries don't sync");
+ return;
+ }
+
+ ReplSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd();
+ int targetIndex = 0;
+ for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
+ ++it) {
+ if (it->getHostAndPort() == target) {
+ targetConfig = it;
+ break;
+ }
+ ++targetIndex;
+ }
+ if (targetConfig == _rsConfig.membersEnd()) {
+ *result = Status(ErrorCodes::NodeNotFound,
+ str::stream() << "Could not find member \"" << target.toString()
+ << "\" in replica set");
+ return;
+ }
+ if (targetIndex == _selfIndex) {
+ *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself");
+ return;
+ }
+ if (targetConfig->isArbiter()) {
+ *result = Status(ErrorCodes::InvalidOptions,
+ str::stream() << "Cannot sync from \"" << target.toString()
+ << "\" because it is an arbiter");
+ return;
+ }
+ if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) {
+ *result = Status(ErrorCodes::InvalidOptions,
+ str::stream() << "Cannot sync from \"" << target.toString()
+ << "\" because it does not build indexes");
+ return;
+ }
+
+ if (selfConfig.isVoter() && !targetConfig->isVoter()) {
+ *result = Status(ErrorCodes::InvalidOptions,
+ str::stream() << "Cannot sync from \"" << target.toString()
+ << "\" because it is not a voter");
+ return;
+ }
+
+ const MemberData& hbdata = _memberData.at(targetIndex);
+ if (hbdata.hasAuthIssue()) {
+ *result =
+ Status(ErrorCodes::Unauthorized,
+ str::stream() << "not authorized to communicate with " << target.toString());
+ return;
+ }
+ if (hbdata.getHealth() == 0) {
+ *result =
+ Status(ErrorCodes::HostUnreachable,
+ str::stream() << "I cannot reach the requested member: " << target.toString());
+ return;
+ }
+ const OpTime lastOpApplied = getMyLastAppliedOpTime();
+ if (hbdata.getHeartbeatAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) {
+ warning() << "attempting to sync from " << target << ", but its latest opTime is "
+ << hbdata.getHeartbeatAppliedOpTime().getSecs() << " and ours is "
+ << lastOpApplied.getSecs() << " so this may not work";
+ response->append("warning",
+ str::stream() << "requested member \"" << target.toString()
+ << "\" is more than 10 seconds behind us");
+ // not returning bad Status, just warning
+ }
+
+ HostAndPort prevSyncSource = getSyncSourceAddress();
+ if (!prevSyncSource.empty()) {
+ response->append("prevSyncTarget", prevSyncSource.toString());
+ }
+
+ setForceSyncSourceIndex(targetIndex);
+ *result = Status::OK();
+}
+
+void TopologyCoordinatorImpl::prepareFreshResponse(
+ const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const Date_t now,
+ BSONObjBuilder* response,
+ Status* result) {
+ if (_rsConfig.getProtocolVersion() != 0) {
+ *result = Status(ErrorCodes::BadValue,
+ str::stream() << "replset: incompatible replset protocol version: "
+ << _rsConfig.getProtocolVersion());
+ return;
+ }
+
+ if (_selfIndex == -1) {
+ *result = Status(ErrorCodes::ReplicaSetNotFound,
+ "Cannot participate in elections because not initialized");
+ return;
+ }
+
+ if (args.setName != _rsConfig.getReplSetName()) {
+ *result =
+ Status(ErrorCodes::ReplicaSetNotFound,
+ str::stream() << "Wrong repl set name. Expected: " << _rsConfig.getReplSetName()
+ << ", received: "
+ << args.setName);
+ return;
+ }
+
+ if (args.id == static_cast<unsigned>(_selfConfig().getId())) {
+ *result = Status(ErrorCodes::BadValue,
+ str::stream() << "Received replSetFresh command from member with the "
+ "same member ID as ourself: "
+ << args.id);
+ return;
+ }
+
+ bool weAreFresher = false;
+ const OpTime lastOpApplied = getMyLastAppliedOpTime();
+ if (_rsConfig.getConfigVersion() > args.cfgver) {
+ log() << "replSet member " << args.who << " is not yet aware its cfg version "
+ << args.cfgver << " is stale";
+ response->append("info", "config version stale");
+ weAreFresher = true;
+ }
+ // check not only our own optime, but any other member we can reach
+ else if (OpTime(args.opTime, _term) < _latestKnownOpTime()) {
+ weAreFresher = true;
+ }
+ response->appendDate("opTime",
+ Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL()));
+ response->append("fresher", weAreFresher);
+
+ std::string errmsg;
+ bool doVeto = _shouldVetoMember(args, now, &errmsg);
+ response->append("veto", doVeto);
+ if (doVeto) {
+ response->append("errmsg", errmsg);
+ }
+ *result = Status::OK();
+}
+
+bool TopologyCoordinatorImpl::_shouldVetoMember(
+ const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const Date_t& now,
+ std::string* errmsg) const {
+ if (_rsConfig.getConfigVersion() < args.cfgver) {
+ // We are stale; do not veto.
+ return false;
+ }
+
+ const unsigned int memberID = args.id;
+ const int hopefulIndex = _getMemberIndex(memberID);
+ invariant(hopefulIndex != _selfIndex);
+ const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
+
+ if (hopefulIndex == -1) {
+ *errmsg = str::stream() << "replSet couldn't find member with id " << memberID;
+ return true;
+ }
+ const OpTime lastOpApplied = getMyLastAppliedOpTime();
+ if (_iAmPrimary() &&
+ lastOpApplied >= _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime()) {
+ // hbinfo is not updated for ourself, so if we are primary we have to check the
+ // primary's last optime separately
+ *errmsg = str::stream() << "I am already primary, "
+ << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
+ << " can try again once I've stepped down";
+ return true;
+ }
+
+ if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) &&
+ (_memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime() >=
+ _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime())) {
+ // other members might be aware of more up-to-date nodes
+ *errmsg =
+ str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
+ << " is trying to elect itself but "
+ << _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString()
+ << " is already primary and more up-to-date";
+ return true;
+ }
+
+ if ((highestPriorityIndex != -1)) {
+ const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex);
+ const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
+
+ if (priorityMember.getPriority() > hopefulMember.getPriority()) {
+ *errmsg = str::stream() << hopefulMember.getHostAndPort().toString()
+ << " has lower priority of " << hopefulMember.getPriority()
+ << " than " << priorityMember.getHostAndPort().toString()
+ << " which has a priority of " << priorityMember.getPriority();
+ return true;
+ }
+ }
+
+ UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex);
+ reason &= ~RefusesToStand;
+ if (reason) {
+ *errmsg = str::stream() << "I don't think "
+ << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
+ << " is electable because the "
+ << _getUnelectableReasonString(reason);
+ return true;
+ }
+
+ return false;
+}
+
+// produce a reply to a received electCmd
+void TopologyCoordinatorImpl::prepareElectResponse(
+ const ReplicationCoordinator::ReplSetElectArgs& args,
+ const Date_t now,
+ BSONObjBuilder* response,
+ Status* result) {
+ if (_rsConfig.getProtocolVersion() != 0) {
+ *result = Status(ErrorCodes::BadValue,
+ str::stream() << "replset: incompatible replset protocol version: "
+ << _rsConfig.getProtocolVersion());
+ return;
+ }
+ if (_selfIndex == -1) {
+ *result = Status(ErrorCodes::ReplicaSetNotFound,
+ "Cannot participate in election because not initialized");
+ return;
+ }
+
+ const long long myver = _rsConfig.getConfigVersion();
+ const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
+
+ const MemberConfig* primary = _currentPrimaryMember();
+ const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid);
+ const MemberConfig* highestPriority =
+ highestPriorityIndex == -1 ? NULL : &_rsConfig.getMemberAt(highestPriorityIndex);
+
+ int vote = 0;
+ if (args.set != _rsConfig.getReplSetName()) {
+ log() << "replSet error received an elect request for '" << args.set
+ << "' but our set name is '" << _rsConfig.getReplSetName() << "'";
+ } else if (myver < args.cfgver) {
+ // we are stale. don't vote
+ log() << "replSetElect not voting because our config version is stale. Our version: "
+ << myver << ", their version: " << args.cfgver;
+ } else if (myver > args.cfgver) {
+ // they are stale!
+ log() << "replSetElect command received stale config version # during election. "
+ "Our version: "
+ << myver << ", their version: " << args.cfgver;
+ vote = -10000;
+ } else if (!hopeful) {
+ log() << "replSetElect couldn't find member with id " << args.whoid;
+ vote = -10000;
+ } else if (_iAmPrimary()) {
+ log() << "I am already primary, " << hopeful->getHostAndPort().toString()
+ << " can try again once I've stepped down";
+ vote = -10000;
+ } else if (primary) {
+ log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but "
+ << primary->getHostAndPort().toString() << " is already primary";
+ vote = -10000;
+ } else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) {
+ // TODO(spencer): What if the lower-priority member is more up-to-date?
+ log() << hopeful->getHostAndPort().toString() << " has lower priority than "
+ << highestPriority->getHostAndPort().toString();
+ vote = -10000;
+ } else if (_voteLease.when + VoteLease::leaseTime >= now && _voteLease.whoId != args.whoid) {
+ log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for "
+ << _voteLease.whoHostAndPort.toString() << ' '
+ << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
+ } else {
+ _voteLease.when = now;
+ _voteLease.whoId = args.whoid;
+ _voteLease.whoHostAndPort = hopeful->getHostAndPort();
+ vote = _selfConfig().getNumVotes();
+ invariant(hopeful->getId() == args.whoid);
+ if (vote > 0) {
+ log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " ("
+ << args.whoid << ')';
+ }
+ }
+
+ response->append("vote", vote);
+ response->append("round", args.round);
+ *result = Status::OK();
+}
+
+// produce a reply to a heartbeat
+Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now,
+ const ReplSetHeartbeatArgs& args,
+ const std::string& ourSetName,
+ ReplSetHeartbeatResponse* response) {
+ if (args.getProtocolVersion() != 1) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "replset: incompatible replset protocol version: "
+ << args.getProtocolVersion());
+ }
+
+ // Verify that replica set names match
+ const std::string rshb = args.getSetName();
+ if (ourSetName != rshb) {
+ log() << "replSet set names do not match, ours: " << ourSetName
+ << "; remote node's: " << rshb;
+ response->noteMismatched();
+ return Status(ErrorCodes::InconsistentReplicaSetNames,
+ str::stream() << "Our set name of " << ourSetName << " does not match name "
+ << rshb
+ << " reported by remote node");
+ }
+
+ const MemberState myState = getMemberState();
+ if (_selfIndex == -1) {
+ if (myState.removed()) {
+ return Status(ErrorCodes::InvalidReplicaSetConfig,
+ "Our replica set configuration is invalid or does not include us");
+ }
+ } else {
+ invariant(_rsConfig.getReplSetName() == args.getSetName());
+ if (args.getSenderId() == _selfConfig().getId()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "Received heartbeat from member with the same "
+ "member ID as ourself: "
+ << args.getSenderId());
+ }
+ }
+
+ // This is a replica set
+ response->noteReplSet();
+
+ response->setSetName(ourSetName);
+ response->setState(myState.s);
+ if (myState.primary()) {
+ response->setElectionTime(_electionTime);
+ }
+
+ const OpTime lastOpApplied = getMyLastAppliedOpTime();
+ const OpTime lastOpDurable = getMyLastDurableOpTime();
+
+ // Are we electable
+ response->setElectable(!_getMyUnelectableReason(now, StartElectionReason::kElectionTimeout));
+
+ // Heartbeat status message
+ response->setHbMsg(_getHbmsg(now));
+ response->setTime(duration_cast<Seconds>(now - Date_t{}));
+ response->setAppliedOpTime(lastOpApplied);
+ response->setDurableOpTime(lastOpDurable);
+
+ if (!_syncSource.empty()) {
+ response->setSyncingTo(_syncSource);
+ }
+
+ if (!_rsConfig.isInitialized()) {
+ response->setConfigVersion(-2);
+ return Status::OK();
+ }
+
+ const long long v = _rsConfig.getConfigVersion();
+ response->setConfigVersion(v);
+ // Deliver new config if caller's version is older than ours
+ if (v > args.getConfigVersion()) {
+ response->setConfig(_rsConfig);
+ }
+
+ // Resolve the caller's id in our Member list
+ int from = -1;
+ if (v == args.getConfigVersion() && args.getSenderId() != -1) {
+ from = _getMemberIndex(args.getSenderId());
+ }
+ if (from == -1) {
+ // Can't find the member, so we leave out the stateDisagreement field
+ return Status::OK();
+ }
+ invariant(from != _selfIndex);
+
+ // if we thought that this node is down, let it know
+ if (!_memberData.at(from).up()) {
+ response->noteStateDisagreement();
+ }
+
+ // note that we got a heartbeat from this node
+ _memberData.at(from).setLastHeartbeatRecv(now);
+ return Status::OK();
+}
+
+Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now,
+ const ReplSetHeartbeatArgsV1& args,
+ const std::string& ourSetName,
+ ReplSetHeartbeatResponse* response) {
+ // Verify that replica set names match
+ const std::string rshb = args.getSetName();
+ if (ourSetName != rshb) {
+ log() << "replSet set names do not match, ours: " << ourSetName
+ << "; remote node's: " << rshb;
+ return Status(ErrorCodes::InconsistentReplicaSetNames,
+ str::stream() << "Our set name of " << ourSetName << " does not match name "
+ << rshb
+ << " reported by remote node");
+ }
+
+ const MemberState myState = getMemberState();
+ if (_selfIndex == -1) {
+ if (myState.removed()) {
+ return Status(ErrorCodes::InvalidReplicaSetConfig,
+ "Our replica set configuration is invalid or does not include us");
+ }
+ } else {
+ if (args.getSenderId() == _selfConfig().getId()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "Received heartbeat from member with the same "
+ "member ID as ourself: "
+ << args.getSenderId());
+ }
+ }
+
+ response->setSetName(ourSetName);
+
+ response->setState(myState.s);
+
+ if (myState.primary()) {
+ response->setElectionTime(_electionTime);
+ }
+
+ const OpTime lastOpApplied = getMyLastAppliedOpTime();
+ const OpTime lastOpDurable = getMyLastDurableOpTime();
+ response->setAppliedOpTime(lastOpApplied);
+ response->setDurableOpTime(lastOpDurable);
+
+ if (_currentPrimaryIndex != -1) {
+ response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId());
+ }
+
+ response->setTerm(_term);
+
+ if (!_syncSource.empty()) {
+ response->setSyncingTo(_syncSource);
+ }
+
+ if (!_rsConfig.isInitialized()) {
+ response->setConfigVersion(-2);
+ return Status::OK();
+ }
+
+ const long long v = _rsConfig.getConfigVersion();
+ response->setConfigVersion(v);
+ // Deliver new config if caller's version is older than ours
+ if (v > args.getConfigVersion()) {
+ response->setConfig(_rsConfig);
+ }
+
+ // Resolve the caller's id in our Member list
+ int from = -1;
+ if (v == args.getConfigVersion() && args.getSenderId() != -1) {
+ from = _getMemberIndex(args.getSenderId());
+ }
+ if (from == -1) {
+ return Status::OK();
+ }
+ invariant(from != _selfIndex);
+
+ // note that we got a heartbeat from this node
+ _memberData.at(from).setLastHeartbeatRecv(now);
+ return Status::OK();
+}
+
+int TopologyCoordinatorImpl::_getMemberIndex(int id) const {
+ int index = 0;
+ for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
+ ++it, ++index) {
+ if (it->getId() == id) {
+ return index;
+ }
+ }
+ return -1;
+}
+
+std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequest(
+ Date_t now, const std::string& ourSetName, const HostAndPort& target) {
+ PingStats& hbStats = _pings[target];
+ Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
+ if (!_rsConfig.isInitialized() ||
+ (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) ||
+ (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
+ // This is either the first request ever for "target", or the heartbeat timeout has
+ // passed, so we're starting a "new" heartbeat.
+ hbStats.start(now);
+ alreadyElapsed = Milliseconds(0);
+ }
+ ReplSetHeartbeatArgs hbArgs;
+ hbArgs.setProtocolVersion(1);
+ hbArgs.setCheckEmpty(false);
+ if (_rsConfig.isInitialized()) {
+ hbArgs.setSetName(_rsConfig.getReplSetName());
+ hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
+ if (_selfIndex >= 0) {
+ const MemberConfig& me = _selfConfig();
+ hbArgs.setSenderHost(me.getHostAndPort());
+ hbArgs.setSenderId(me.getId());
+ }
+ } else {
+ hbArgs.setSetName(ourSetName);
+ hbArgs.setConfigVersion(-2);
+ }
+ if (serverGlobalParams.featureCompatibility.getVersion() !=
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) {
+ hbArgs.setHeartbeatVersion(1);
+ }
+
+ const Milliseconds timeoutPeriod(
+ _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
+ : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod});
+ const Milliseconds timeout = timeoutPeriod - alreadyElapsed;
+ return std::make_pair(hbArgs, timeout);
+}
+
+std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequestV1(
+ Date_t now, const std::string& ourSetName, const HostAndPort& target) {
+ PingStats& hbStats = _pings[target];
+ Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64());
+ if (!_rsConfig.isInitialized() ||
+ (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) ||
+ (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
+ // This is either the first request ever for "target", or the heartbeat timeout has
+ // passed, so we're starting a "new" heartbeat.
+ hbStats.start(now);
+ alreadyElapsed = Milliseconds(0);
+ }
+ ReplSetHeartbeatArgsV1 hbArgs;
+ if (_rsConfig.isInitialized()) {
+ hbArgs.setSetName(_rsConfig.getReplSetName());
+ hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
+ if (_selfIndex >= 0) {
+ const MemberConfig& me = _selfConfig();
+ hbArgs.setSenderId(me.getId());
+ hbArgs.setSenderHost(me.getHostAndPort());
+ }
+ hbArgs.setTerm(_term);
+ } else {
+ hbArgs.setSetName(ourSetName);
+ // Config version -2 is for uninitialized config.
+ hbArgs.setConfigVersion(-2);
+ hbArgs.setTerm(OpTime::kInitialTerm);
+ }
+ if (serverGlobalParams.featureCompatibility.getVersion() !=
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) {
+ hbArgs.setHeartbeatVersion(1);
+ }
+
+ const Milliseconds timeoutPeriod(
+ _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
+ : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod});
+ const Milliseconds timeout(timeoutPeriod - alreadyElapsed);
+ return std::make_pair(hbArgs, timeout);
+}
+
+HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse(
+ Date_t now,
+ Milliseconds networkRoundTripTime,
+ const HostAndPort& target,
+ const StatusWith<ReplSetHeartbeatResponse>& hbResponse) {
+ const MemberState originalState = getMemberState();
+ PingStats& hbStats = _pings[target];
+ invariant(hbStats.getLastHeartbeatStartDate() != Date_t());
+ const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) ||
+ (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed);
+ if (!hbResponse.isOK()) {
+ if (isUnauthorized) {
+ hbStats.hit(networkRoundTripTime);
+ } else {
+ hbStats.miss();
+ }
+ } else {
+ hbStats.hit(networkRoundTripTime);
+ // Log diagnostics.
+ if (hbResponse.getValue().isStateDisagreement()) {
+ LOG(1) << target << " thinks that we are down because they cannot send us heartbeats.";
+ }
+ }
+
+ // If a node is not PRIMARY and has no sync source, we increase the heartbeat rate in order
+ // to help it find a sync source more quickly, which helps ensure the PRIMARY will continue to
+ // see the majority of the cluster.
+ //
+ // Arbiters also decrease their heartbeat interval to at most half the election timeout period.
+ Milliseconds heartbeatInterval = _rsConfig.getHeartbeatInterval();
+ if (_rsConfig.getProtocolVersion() == 1) {
+ if (getMemberState().arbiter()) {
+ heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2,
+ _rsConfig.getHeartbeatInterval());
+ } else if (getSyncSourceAddress().empty() && !_iAmPrimary()) {
+ heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2,
+ _rsConfig.getHeartbeatInterval() / 4);
+ }
+ }
+
+ const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
+ Date_t nextHeartbeatStartDate;
+ // Determine next heartbeat start time.
+ if (hbStats.getNumFailuresSinceLastStart() <= kMaxHeartbeatRetries &&
+ alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriod()) {
+ // There are still retries left, let's use one.
+ nextHeartbeatStartDate = now;
+ } else {
+ nextHeartbeatStartDate = now + heartbeatInterval;
+ }
+
+ if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) {
+ const long long currentConfigVersion =
+ _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2;
+ const ReplSetConfig& newConfig = hbResponse.getValue().getConfig();
+ if (newConfig.getConfigVersion() > currentConfigVersion) {
+ HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction();
+ nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
+ return nextAction;
+ } else {
+ // Could be we got the newer version before we got the response, or the
+ // target erroneously sent us one, even through it isn't newer.
+ if (newConfig.getConfigVersion() < currentConfigVersion) {
+ LOG(1) << "Config version from heartbeat was older than ours.";
+ } else {
+ LOG(2) << "Config from heartbeat response was same as ours.";
+ }
+ if (logger::globalLogDomain()->shouldLog(MongoLogDefaultComponent_component,
+ ::mongo::LogstreamBuilder::severityCast(2))) {
+ LogstreamBuilder lsb = log();
+ if (_rsConfig.isInitialized()) {
+ lsb << "Current config: " << _rsConfig.toBSON() << "; ";
+ }
+ lsb << "Config in heartbeat: " << newConfig.toBSON();
+ }
+ }
+ }
+
+ // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do,
+ // so return early.
+ if (!_rsConfig.isInitialized()) {
+ HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
+ nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
+ return nextAction;
+ }
+ // If we're not in the config, we don't need to respond to heartbeats.
+ if (_selfIndex == -1) {
+ LOG(1) << "Could not find ourself in current config so ignoring heartbeat from " << target
+ << " -- current config: " << _rsConfig.toBSON();
+ HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
+ nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
+ return nextAction;
+ }
+ const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target);
+ if (memberIndex == -1) {
+ LOG(1) << "Could not find " << target << " in current config so ignoring --"
+ " current config: "
+ << _rsConfig.toBSON();
+ HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
+ nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
+ return nextAction;
+ }
+
+ invariant(memberIndex != _selfIndex);
+
+ MemberData& hbData = _memberData.at(memberIndex);
+ const MemberConfig member = _rsConfig.getMemberAt(memberIndex);
+ bool advancedOpTime = false;
+ if (!hbResponse.isOK()) {
+ if (isUnauthorized) {
+ hbData.setAuthIssue(now);
+ } else if (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries ||
+ alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriod()) {
+ hbData.setDownValues(now, hbResponse.getStatus().reason());
+ } else {
+ LOG(3) << "Bad heartbeat response from " << target << "; trying again; Retries left: "
+ << (kMaxHeartbeatRetries - hbStats.getNumFailuresSinceLastStart()) << "; "
+ << alreadyElapsed << " have already elapsed";
+ }
+ } else {
+ ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue());
+ LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId()
+ << ", msg: " << hbr.getHbMsg();
+ advancedOpTime = hbData.setUpValues(now, std::move(hbr));
+ }
+
+ HeartbeatResponseAction nextAction;
+ if (_rsConfig.getProtocolVersion() == 0) {
+ nextAction = _updatePrimaryFromHBData(memberIndex, originalState, now);
+ } else {
+ nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now);
+ }
+
+ nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
+ nextAction.setAdvancedOpTime(advancedOpTime);
+ return nextAction;
+}
+
+bool TopologyCoordinatorImpl::haveNumNodesReachedOpTime(const OpTime& targetOpTime,
+ int numNodes,
+ bool durablyWritten) {
+ // Replication progress that is for some reason ahead of us should not allow us to
+ // satisfy a write concern if we aren't caught up ourselves.
+ OpTime myOpTime = durablyWritten ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
+ if (myOpTime < targetOpTime) {
+ return false;
+ }
+
+ for (auto&& memberData : _memberData) {
+ const OpTime& memberOpTime =
+ durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime();
+ if (memberOpTime >= targetOpTime) {
+ --numNodes;
+ }
+
+ if (numNodes <= 0) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool TopologyCoordinatorImpl::haveTaggedNodesReachedOpTime(const OpTime& opTime,
+ const ReplSetTagPattern& tagPattern,
+ bool durablyWritten) {
+ ReplSetTagMatch matcher(tagPattern);
+ for (auto&& memberData : _memberData) {
+ const OpTime& memberOpTime =
+ durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime();
+ if (memberOpTime >= opTime) {
+ // This node has reached the desired optime, now we need to check if it is a part
+ // of the tagPattern.
+ int memberIndex = memberData.getConfigIndex();
+ invariant(memberIndex >= 0);
+ const MemberConfig& memberConfig = _rsConfig.getMemberAt(memberIndex);
+ for (MemberConfig::TagIterator it = memberConfig.tagsBegin();
+ it != memberConfig.tagsEnd();
+ ++it) {
+ if (matcher.update(*it)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+HeartbeatResponseAction TopologyCoordinatorImpl::checkMemberTimeouts(Date_t now) {
+ bool stepdown = false;
+ for (int memberIndex = 0; memberIndex < static_cast<int>(_memberData.size()); memberIndex++) {
+ auto& memberData = _memberData[memberIndex];
+ if (!memberData.isSelf() && !memberData.lastUpdateStale() &&
+ now - memberData.getLastUpdate() >= _rsConfig.getElectionTimeoutPeriod()) {
+ memberData.markLastUpdateStale();
+ if (_iAmPrimary()) {
+ stepdown = stepdown || setMemberAsDown(now, memberIndex);
+ }
+ }
+ }
+ if (stepdown) {
+ log() << "can't see a majority of the set, relinquishing primary";
+ return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
+ }
+ return HeartbeatResponseAction::makeNoAction();
+}
+
+std::vector<HostAndPort> TopologyCoordinatorImpl::getHostsWrittenTo(const OpTime& op,
+ bool durablyWritten,
+ bool skipSelf) {
+ std::vector<HostAndPort> hosts;
+ for (const auto& memberData : _memberData) {
+ if (skipSelf && memberData.isSelf()) {
+ continue;
+ }
+
+ if (durablyWritten) {
+ if (memberData.getLastDurableOpTime() < op) {
+ continue;
+ }
+ } else if (memberData.getLastAppliedOpTime() < op) {
+ continue;
+ }
+
+ hosts.push_back(memberData.getHostAndPort());
+ }
+ return hosts;
+}
+
+bool TopologyCoordinatorImpl::setMemberAsDown(Date_t now, const int memberIndex) {
+ invariant(memberIndex != _selfIndex);
+ invariant(memberIndex != -1);
+ invariant(_currentPrimaryIndex == _selfIndex);
+ MemberData& hbData = _memberData.at(memberIndex);
+ hbData.setDownValues(now, "no response within election timeout period");
+
+ if (CannotSeeMajority & _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) {
+ return true;
+ }
+
+ return false;
+}
+
+std::pair<int, Date_t> TopologyCoordinatorImpl::getStalestLiveMember() const {
+ Date_t earliestDate = Date_t::max();
+ int earliestMemberId = -1;
+ for (const auto& memberData : _memberData) {
+ if (memberData.isSelf()) {
+ continue;
+ }
+ if (memberData.lastUpdateStale()) {
+ // Already stale.
+ continue;
+ }
+ LOG(3) << "memberData lastupdate is: " << memberData.getLastUpdate();
+ if (earliestDate > memberData.getLastUpdate()) {
+ earliestDate = memberData.getLastUpdate();
+ earliestMemberId = memberData.getMemberId();
+ }
+ }
+ LOG(3) << "stalest member " << earliestMemberId << " date: " << earliestDate;
+ return std::make_pair(earliestMemberId, earliestDate);
+}
+
+void TopologyCoordinatorImpl::resetAllMemberTimeouts(Date_t now) {
+ for (auto&& memberData : _memberData)
+ memberData.updateLiveness(now);
+}
+
+void TopologyCoordinatorImpl::resetMemberTimeouts(
+ Date_t now, const stdx::unordered_set<HostAndPort>& member_set) {
+ for (auto&& memberData : _memberData) {
+ if (member_set.count(memberData.getHostAndPort()))
+ memberData.updateLiveness(now);
+ }
+}
+
+OpTime TopologyCoordinatorImpl::getMyLastAppliedOpTime() const {
+ return _selfMemberData().getLastAppliedOpTime();
+}
+
+OpTime TopologyCoordinatorImpl::getMyLastDurableOpTime() const {
+ return _selfMemberData().getLastDurableOpTime();
+}
+
+MemberData* TopologyCoordinatorImpl::getMyMemberData() {
+ return &_memberData[_selfMemberDataIndex()];
+}
+
+MemberData* TopologyCoordinatorImpl::findMemberDataByMemberId(const int memberId) {
+ const int memberIndex = _getMemberIndex(memberId);
+ if (memberIndex >= 0)
+ return &_memberData[memberIndex];
+ return nullptr;
+}
+
+MemberData* TopologyCoordinatorImpl::findMemberDataByRid(const OID rid) {
+ for (auto& memberData : _memberData) {
+ if (memberData.getRid() == rid)
+ return &memberData;
+ }
+ return nullptr;
+}
+
+MemberData* TopologyCoordinatorImpl::addSlaveMemberData(const OID rid) {
+ invariant(!_memberData.empty()); // Must always have our own entry first.
+ invariant(!_rsConfig.isInitialized()); // Used only for master-slave.
+ _memberData.emplace_back();
+ auto* result = &_memberData.back();
+ result->setRid(rid);
+ return result;
+}
+
+HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1(
+ int updatedConfigIndex, const MemberState& originalState, Date_t now) {
+ //
+ // Updates the local notion of which remote node, if any is primary.
+ // Start the priority takeover process if we are eligible.
+ //
+
+ invariant(updatedConfigIndex != _selfIndex);
+
+ // If we are missing from the config, do not participate in primary maintenance or election.
+ if (_selfIndex == -1) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ // If we are the primary, there must be no other primary, otherwise its higher term would
+ // have already made us step down.
+ if (_currentPrimaryIndex == _selfIndex) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+
+ // Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex.
+ int primaryIndex = -1;
+ for (size_t i = 0; i < _memberData.size(); i++) {
+ const MemberData& member = _memberData.at(i);
+ if (member.getState().primary() && member.up()) {
+ if (primaryIndex == -1 || _memberData.at(primaryIndex).getTerm() < member.getTerm()) {
+ primaryIndex = i;
+ }
+ }
+ }
+ _currentPrimaryIndex = primaryIndex;
+ if (_currentPrimaryIndex == -1) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+
+ // Clear last heartbeat message on ourselves.
+ setMyHeartbeatMessage(now, "");
+
+ // Takeover when the replset is stable.
+ //
+ // Take over the primary only if the remote primary is in the latest term I know.
+ // This is done only when we get a heartbeat response from the primary.
+ // Otherwise, there must be an outstanding election, which may succeed or not, but
+ // the remote primary will become aware of that election eventually and step down.
+ if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) {
+
+ // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled.
+ bool catchupTakeoverDisabled =
+ ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() ||
+ ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay();
+ if (!catchupTakeoverDisabled && (_memberData.at(primaryIndex).getLastAppliedOpTime() <
+ _memberData.at(_selfIndex).getLastAppliedOpTime())) {
+ LOG(2) << "I can take over the primary due to fresher data."
+ << " Current primary index: " << primaryIndex << " in term "
+ << _memberData.at(primaryIndex).getTerm() << "."
+ << " Current primary optime: "
+ << _memberData.at(primaryIndex).getLastAppliedOpTime()
+ << " My optime: " << _memberData.at(_selfIndex).getLastAppliedOpTime();
+
+ return HeartbeatResponseAction::makeCatchupTakeoverAction();
+ }
+
+ if (_rsConfig.getMemberAt(primaryIndex).getPriority() <
+ _rsConfig.getMemberAt(_selfIndex).getPriority()) {
+ LOG(4) << "I can take over the primary due to higher priority."
+ << " Current primary index: " << primaryIndex << " in term "
+ << _memberData.at(primaryIndex).getTerm();
+
+ return HeartbeatResponseAction::makePriorityTakeoverAction();
+ }
+ }
+ return HeartbeatResponseAction::makeNoAction();
+}
+
+HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData(
+ int updatedConfigIndex, const MemberState& originalState, Date_t now) {
+ // This method has two interrelated responsibilities, performed in two phases.
+ //
+ // First, it updates the local notion of which remote node, if any is primary. In the
+ // process, it may request a remote primary to step down because there is a higher priority
+ // node waiting, or because the local node thinks it is primary and that it has a more
+ // recent electionTime. It may instead decide that the local node should step down itself,
+ // because a remote has a more recent election time.
+ //
+ // Second, if there is no remote primary, and the local node is not primary, it considers
+ // whether or not to stand for election.
+ invariant(updatedConfigIndex != _selfIndex);
+
+ // We are missing from the config, so do not participate in primary maintenance or election.
+ if (_selfIndex == -1) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+
+ ////////////////////
+ // Phase 1
+ ////////////////////
+
+ // If we believe the node whose data was just updated is primary, confirm that
+ // the updated data supports that notion. If not, erase our notion of who is primary.
+ if (updatedConfigIndex == _currentPrimaryIndex) {
+ const MemberData& updatedHBData = _memberData.at(updatedConfigIndex);
+ if (!updatedHBData.up() || !updatedHBData.getState().primary()) {
+ _currentPrimaryIndex = -1;
+ }
+ }
+
+ // If the current primary is not highest priority and up to date (within 10s),
+ // have them/me stepdown.
+ if (_currentPrimaryIndex != -1) {
+ // check if we should ask the primary (possibly ourselves) to step down
+ const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
+ if (highestPriorityIndex != -1) {
+ const MemberConfig& currentPrimaryMember = _rsConfig.getMemberAt(_currentPrimaryIndex);
+ const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
+ const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex
+ ? getMyLastAppliedOpTime()
+ : _memberData.at(highestPriorityIndex).getHeartbeatAppliedOpTime();
+
+ if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) &&
+ _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime)) {
+ const OpTime latestOpTime = _latestKnownOpTime();
+
+ if (_iAmPrimary()) {
+ if (_leaderMode == LeaderMode::kSteppingDown) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ log() << "Stepping down self (priority " << currentPrimaryMember.getPriority()
+ << ") because " << highestPriorityMember.getHostAndPort()
+ << " has higher priority " << highestPriorityMember.getPriority()
+ << " and is only "
+ << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
+ << " seconds behind me";
+ const Date_t until =
+ now + VoteLease::leaseTime + _rsConfig.getHeartbeatInterval();
+ if (_electionSleepUntil < until) {
+ _electionSleepUntil = until;
+ }
+ return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
+ } else if ((highestPriorityIndex == _selfIndex) && (_electionSleepUntil <= now)) {
+ // If this node is the highest priority node, and it is not in
+ // an inter-election sleep period, ask the current primary to step down.
+ // This is an optimization, because the remote primary will almost certainly
+ // notice this node's electability promptly, via its own heartbeat process.
+ log() << "Requesting that " << currentPrimaryMember.getHostAndPort()
+ << " (priority " << currentPrimaryMember.getPriority()
+ << ") step down because I have higher priority "
+ << highestPriorityMember.getPriority() << " and am only "
+ << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
+ << " seconds behind it";
+ int primaryIndex = _currentPrimaryIndex;
+ _currentPrimaryIndex = -1;
+ return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex);
+ }
+ }
+ }
+ }
+
+ // Scan the member list's heartbeat data for who is primary, and update
+ // _currentPrimaryIndex and _role, or request a remote to step down, as necessary.
+ {
+ int remotePrimaryIndex = -1;
+ for (std::vector<MemberData>::const_iterator it = _memberData.begin();
+ it != _memberData.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_memberData, it);
+ if (itIndex == _selfIndex) {
+ continue;
+ }
+
+ if (it->getState().primary() && it->up()) {
+ if (remotePrimaryIndex != -1) {
+ // two other nodes think they are primary (asynchronously polled)
+ // -- wait for things to settle down.
+ warning() << "two remote primaries (transiently)";
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ remotePrimaryIndex = itIndex;
+ }
+ }
+
+ if (remotePrimaryIndex != -1) {
+ // If it's the same as last time, don't do anything further.
+ if (_currentPrimaryIndex == remotePrimaryIndex) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ // Clear last heartbeat message on ourselves (why?)
+ setMyHeartbeatMessage(now, "");
+
+ // If we are also primary, this is a problem. Determine who should step down.
+ if (_iAmPrimary()) {
+ Timestamp remoteElectionTime = _memberData.at(remotePrimaryIndex).getElectionTime();
+ log() << "another primary seen with election time " << remoteElectionTime
+ << " my election time is " << _electionTime;
+
+ // Step down whomever has the older election time.
+ if (remoteElectionTime > _electionTime) {
+ if (_leaderMode == LeaderMode::kSteppingDown) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ log() << "stepping down; another primary was elected more recently";
+ return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
+ } else {
+ log() << "another PRIMARY detected and it should step down"
+ " since it was elected earlier than me";
+ return HeartbeatResponseAction::makeStepDownRemoteAction(remotePrimaryIndex);
+ }
+ }
+
+ _currentPrimaryIndex = remotePrimaryIndex;
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ }
+
+ ////////////////////
+ // Phase 2
+ ////////////////////
+
+ // We do not believe any remote to be primary.
+
+ // If we are primary, check if we can still see majority of the set;
+ // stepdown if we can't.
+ if (_iAmPrimary()) {
+ if (CannotSeeMajority &
+ _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) {
+ if (_leaderMode == LeaderMode::kSteppingDown) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ log() << "can't see a majority of the set, relinquishing primary";
+ return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
+ }
+
+ LOG(2) << "Choosing to remain primary";
+ return HeartbeatResponseAction::makeNoAction();
+ }
+
+ fassert(18505, _currentPrimaryIndex == -1);
+
+ const MemberState currentState = getMemberState();
+ if (originalState.recovering() && currentState.secondary()) {
+ // We just transitioned from RECOVERING to SECONDARY, this can only happen if we
+ // received a heartbeat with an auth error when previously all the heartbeats we'd
+ // received had auth errors. In this case, don't return makeElectAction() because
+ // that could cause the election to start before the ReplicationCoordinator has updated
+ // its notion of the member state to SECONDARY. Instead return noAction so that the
+ // ReplicationCooridinator knows to update its tracking of the member state off of the
+ // TopologyCoordinator, and leave starting the election until the next heartbeat comes
+ // back.
+ return HeartbeatResponseAction::makeNoAction();
+ }
+
+ // At this point, there is no primary anywhere. Check to see if we should become a candidate.
+ const auto status = checkShouldStandForElection(now);
+ if (!status.isOK()) {
+ // NOTE: This log line is checked in unit test(s).
+ LOG(2) << "TopologyCoordinatorImpl::_updatePrimaryFromHBData - " << status.reason();
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ fassertStatusOK(28816, becomeCandidateIfElectable(now, StartElectionReason::kElectionTimeout));
+ return HeartbeatResponseAction::makeElectAction();
+}
+
+Status TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now) const {
+ if (_currentPrimaryIndex != -1) {
+ return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"};
+ }
+ invariant(_role != Role::leader);
+
+ if (_role == Role::candidate) {
+ return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"};
+ }
+
+ const UnelectableReasonMask unelectableReason =
+ _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout);
+ if (NotCloseEnoughToLatestOptime & unelectableReason) {
+ return {ErrorCodes::NodeNotElectable,
+ str::stream() << "Not standing for election because "
+ << _getUnelectableReasonString(unelectableReason)
+ << "; my last optime is "
+ << getMyLastAppliedOpTime().toString()
+ << " and the newest is "
+ << _latestKnownOpTime().toString()};
+ }
+ if (unelectableReason) {
+ return {ErrorCodes::NodeNotElectable,
+ str::stream() << "Not standing for election because "
+ << _getUnelectableReasonString(unelectableReason)};
+ }
+ if (_electionSleepUntil > now) {
+ if (_rsConfig.getProtocolVersion() == 1) {
+ return {
+ ErrorCodes::NodeNotElectable,
+ str::stream() << "Not standing for election before "
+ << dateToISOStringLocal(_electionSleepUntil)
+ << " because I stood up or learned about a new term too recently"};
+ } else {
+ return {ErrorCodes::NodeNotElectable,
+ str::stream() << "Not standing for election before "
+ << dateToISOStringLocal(_electionSleepUntil)
+ << " because I stood too recently"};
+ }
+ }
+ // All checks passed. Start election proceedings.
+ return Status::OK();
+}
+
+bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const {
+ int vUp = 0;
+ for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_memberData, it);
+ if (itIndex == _selfIndex || it->up()) {
+ vUp += _rsConfig.getMemberAt(itIndex).getNumVotes();
+ }
+ }
+
+ return vUp * 2 > _rsConfig.getTotalVotingMembers();
+}
+
+int TopologyCoordinatorImpl::_findHealthyPrimaryOfEqualOrGreaterPriority(
+ const int candidateIndex) const {
+ const double candidatePriority = _rsConfig.getMemberAt(candidateIndex).getPriority();
+ for (auto it = _memberData.begin(); it != _memberData.end(); ++it) {
+ if (!it->up() || it->getState() != MemberState::RS_PRIMARY) {
+ continue;
+ }
+ const int itIndex = indexOfIterator(_memberData, it);
+ const double priority = _rsConfig.getMemberAt(itIndex).getPriority();
+ if (itIndex != candidateIndex && priority >= candidatePriority) {
+ return itIndex;
+ }
+ }
+
+ return -1;
+}
+
+bool TopologyCoordinatorImpl::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const {
+ const OpTime latestKnownOpTime = _latestKnownOpTime();
+ // Use addition instead of subtraction to avoid overflow.
+ return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs());
+}
+
+bool TopologyCoordinatorImpl::_amIFreshEnoughForPriorityTakeover() const {
+ const OpTime latestKnownOpTime = _latestKnownOpTime();
+
+ // Rules are:
+ // - If the terms don't match, we don't call for priority takeover.
+ // - If our optime and the latest optime happen in different seconds, our optime must be within
+ // at least priorityTakeoverFreshnessWindowSeconds seconds of the latest optime.
+ // - If our optime and the latest optime happen in the same second, our optime must be within
+ // at least 1000 oplog entries of the latest optime (i.e. the increment portion of the timestamp
+ // must be within 1000). This is to handle the case where a primary had its clock set far into
+ // the future, took some writes, then had its clock set back. In that case the timestamp
+ // component of all future oplog entries generated will be the same, until real world time
+ // passes the timestamp component of the last oplog entry.
+
+ const OpTime ourLastOpApplied = getMyLastAppliedOpTime();
+ if (ourLastOpApplied.getTerm() != latestKnownOpTime.getTerm()) {
+ return false;
+ }
+
+ if (ourLastOpApplied.getTimestamp().getSecs() != latestKnownOpTime.getTimestamp().getSecs()) {
+ return ourLastOpApplied.getTimestamp().getSecs() + priorityTakeoverFreshnessWindowSeconds >=
+ latestKnownOpTime.getTimestamp().getSecs();
+ } else {
+ return ourLastOpApplied.getTimestamp().getInc() + 1000 >=
+ latestKnownOpTime.getTimestamp().getInc();
+ }
+}
+
+bool TopologyCoordinatorImpl::_amIFreshEnoughForCatchupTakeover() const {
+
+ const OpTime latestKnownOpTime = _latestKnownOpTime();
+
+ // Rules are:
+ // - We must have the freshest optime of all the up nodes.
+ // - We must specifically have a fresher optime than the primary (can't be equal).
+ // - The term of our last applied op must be less than the current term. This ensures that no
+ // writes have happened since the most recent election and that the primary is still in
+ // catchup mode.
+
+ // There is no point to a catchup takeover if we aren't the freshest node because
+ // another node would immediately perform another catchup takeover when we become primary.
+ const OpTime ourLastOpApplied = getMyLastAppliedOpTime();
+ if (ourLastOpApplied < latestKnownOpTime) {
+ return false;
+ }
+
+ if (_currentPrimaryIndex == -1) {
+ return false;
+ }
+
+ // If we aren't ahead of the primary, there is no point to having a catchup takeover.
+ const OpTime primaryLastOpApplied = _memberData[_currentPrimaryIndex].getLastAppliedOpTime();
+
+ if (ourLastOpApplied <= primaryLastOpApplied) {
+ return false;
+ }
+
+ // If the term of our last applied op is less than the current term, the primary didn't write
+ // anything and it is still in catchup mode.
+ return ourLastOpApplied.getTerm() < _term;
+}
+
+bool TopologyCoordinatorImpl::_iAmPrimary() const {
+ if (_role == Role::leader) {
+ invariant(_currentPrimaryIndex == _selfIndex);
+ invariant(_leaderMode != LeaderMode::kNotLeader);
+ return true;
+ }
+ return false;
+}
+
+OpTime TopologyCoordinatorImpl::_latestKnownOpTime() const {
+ OpTime latest = getMyLastAppliedOpTime();
+ for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
+ ++it) {
+ // Ignore self
+ // TODO(russotto): Simplify when heartbeat and spanning tree times are combined.
+ if (it->isSelf()) {
+ continue;
+ }
+ // Ignore down members
+ if (!it->up()) {
+ continue;
+ }
+ // Ignore removed nodes (not in config, so not valid).
+ if (it->getState().removed()) {
+ continue;
+ }
+
+ OpTime optime = it->getHeartbeatAppliedOpTime();
+
+ if (optime > latest) {
+ latest = optime;
+ }
+ }
+
+ return latest;
+}
+
+bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex,
+ int memberTwoIndex) const {
+ if (memberOneIndex == -1)
+ return false;
+
+ if (memberTwoIndex == -1)
+ return true;
+
+ return _rsConfig.getMemberAt(memberOneIndex).getPriority() >
+ _rsConfig.getMemberAt(memberTwoIndex).getPriority();
+}
+
+int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now) const {
+ int maxIndex = -1;
+ for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) {
+ UnelectableReasonMask reason = currentIndex == _selfIndex
+ ? _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)
+ : _getUnelectableReason(currentIndex);
+ if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) {
+ maxIndex = currentIndex;
+ }
+ }
+
+ return maxIndex;
+}
+
+bool TopologyCoordinatorImpl::prepareForUnconditionalStepDown() {
+ if (_leaderMode == LeaderMode::kSteppingDown) {
+ // Can only be processing one required stepdown at a time.
+ return false;
+ }
+ // Heartbeat-initiated stepdowns take precedence over stepdown command initiated stepdowns, so
+ // it's safe to transition from kAttemptingStepDown to kSteppingDown.
+ _setLeaderMode(LeaderMode::kSteppingDown);
+ return true;
+}
+
+Status TopologyCoordinatorImpl::prepareForStepDownAttempt() {
+ if (_leaderMode == LeaderMode::kSteppingDown ||
+ _leaderMode == LeaderMode::kAttemptingStepDown) {
+ return Status{ErrorCodes::ConflictingOperationInProgress,
+ "This node is already in the process of stepping down"};
+ }
+ _setLeaderMode(LeaderMode::kAttemptingStepDown);
+ return Status::OK();
+}
+
+void TopologyCoordinatorImpl::abortAttemptedStepDownIfNeeded() {
+ if (_leaderMode == TopologyCoordinator::LeaderMode::kAttemptingStepDown) {
+ _setLeaderMode(TopologyCoordinator::LeaderMode::kMaster);
+ }
+}
+
+void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMemberState,
+ const Timestamp& electionTime) {
+ invariant(_selfIndex != -1);
+ if (newMemberState == getMemberState())
+ return;
+ switch (newMemberState.s) {
+ case MemberState::RS_PRIMARY:
+ _role = Role::candidate;
+ processWinElection(OID(), electionTime);
+ invariant(_role == Role::leader);
+ break;
+ case MemberState::RS_SECONDARY:
+ case MemberState::RS_ROLLBACK:
+ case MemberState::RS_RECOVERING:
+ case MemberState::RS_STARTUP2:
+ _role = Role::follower;
+ _followerMode = newMemberState.s;
+ if (_currentPrimaryIndex == _selfIndex) {
+ _currentPrimaryIndex = -1;
+ _setLeaderMode(LeaderMode::kNotLeader);
+ }
+ break;
+ case MemberState::RS_STARTUP:
+ updateConfig(ReplSetConfig(), -1, Date_t());
+ break;
+ default:
+ severe() << "Cannot switch to state " << newMemberState;
+ invariant(false);
+ }
+ if (getMemberState() != newMemberState.s) {
+ severe() << "Expected to enter state " << newMemberState << " but am now in "
+ << getMemberState();
+ invariant(false);
+ }
+ log() << newMemberState;
+}
+
+void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) {
+ if (primaryIndex == _selfIndex) {
+ changeMemberState_forTest(MemberState::RS_PRIMARY);
+ } else {
+ if (_iAmPrimary()) {
+ changeMemberState_forTest(MemberState::RS_SECONDARY);
+ }
+ if (primaryIndex != -1) {
+ ReplSetHeartbeatResponse hbResponse;
+ hbResponse.setState(MemberState::RS_PRIMARY);
+ hbResponse.setElectionTime(Timestamp());
+ hbResponse.setAppliedOpTime(_memberData.at(primaryIndex).getHeartbeatAppliedOpTime());
+ hbResponse.setSyncingTo(HostAndPort());
+ hbResponse.setHbMsg("");
+ _memberData.at(primaryIndex)
+ .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(),
+ std::move(hbResponse));
+ }
+ _currentPrimaryIndex = primaryIndex;
+ }
+}
+
+const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const {
+ if (_currentPrimaryIndex == -1)
+ return NULL;
+
+ return &(_rsConfig.getMemberAt(_currentPrimaryIndex));
+}
+
+void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
+ BSONObjBuilder* response,
+ Status* result) {
+ // output for each member
+ vector<BSONObj> membersOut;
+ const MemberState myState = getMemberState();
+ const Date_t now = rsStatusArgs.now;
+ const OpTime lastOpApplied = getMyLastAppliedOpTime();
+ const OpTime lastOpDurable = getMyLastDurableOpTime();
+ const BSONObj& initialSyncStatus = rsStatusArgs.initialSyncStatus;
+
+ if (_selfIndex == -1) {
+ // We're REMOVED or have an invalid config
+ response->append("state", static_cast<int>(myState.s));
+ response->append("stateStr", myState.toString());
+ response->append("uptime", rsStatusArgs.selfUptime);
+
+ appendOpTime(response, "optime", lastOpApplied, _rsConfig.getProtocolVersion());
+
+ response->appendDate("optimeDate",
+ Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
+ if (_maintenanceModeCalls) {
+ response->append("maintenanceMode", _maintenanceModeCalls);
+ }
+ std::string s = _getHbmsg(now);
+ if (!s.empty())
+ response->append("infoMessage", s);
+ *result = Status(ErrorCodes::InvalidReplicaSetConfig,
+ "Our replica set config is invalid or we are not a member of it");
+ return;
+ }
+
+ for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_memberData, it);
+ if (itIndex == _selfIndex) {
+ // add self
+ BSONObjBuilder bb;
+ bb.append("_id", _selfConfig().getId());
+ bb.append("name", _selfConfig().getHostAndPort().toString());
+ bb.append("health", 1.0);
+ bb.append("state", static_cast<int>(myState.s));
+ bb.append("stateStr", myState.toString());
+ bb.append("uptime", rsStatusArgs.selfUptime);
+ if (!_selfConfig().isArbiter()) {
+ appendOpTime(&bb, "optime", lastOpApplied, _rsConfig.getProtocolVersion());
+ bb.appendDate("optimeDate",
+ Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
+ }
+
+ if (!_syncSource.empty() && !_iAmPrimary()) {
+ bb.append("syncingTo", _syncSource.toString());
+ }
+
+ if (_maintenanceModeCalls) {
+ bb.append("maintenanceMode", _maintenanceModeCalls);
+ }
+
+ std::string s = _getHbmsg(now);
+ if (!s.empty())
+ bb.append("infoMessage", s);
+
+ if (myState.primary()) {
+ bb.append("electionTime", _electionTime);
+ bb.appendDate("electionDate",
+ Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs())));
+ }
+ bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion());
+ bb.append("self", true);
+ membersOut.push_back(bb.obj());
+ } else {
+ // add non-self member
+ const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex);
+ BSONObjBuilder bb;
+ bb.append("_id", itConfig.getId());
+ bb.append("name", itConfig.getHostAndPort().toString());
+ double h = it->getHealth();
+ bb.append("health", h);
+ const MemberState state = it->getState();
+ bb.append("state", static_cast<int>(state.s));
+ if (h == 0) {
+ // if we can't connect the state info is from the past
+ // and could be confusing to show
+ bb.append("stateStr", "(not reachable/healthy)");
+ } else {
+ bb.append("stateStr", it->getState().toString());
+ }
+
+ const unsigned int uptime = static_cast<unsigned int>((
+ it->getUpSince() != Date_t() ? durationCount<Seconds>(now - it->getUpSince()) : 0));
+ bb.append("uptime", uptime);
+ if (!itConfig.isArbiter()) {
+ appendOpTime(
+ &bb, "optime", it->getHeartbeatAppliedOpTime(), _rsConfig.getProtocolVersion());
+ appendOpTime(&bb,
+ "optimeDurable",
+ it->getHeartbeatDurableOpTime(),
+ _rsConfig.getProtocolVersion());
+
+ bb.appendDate("optimeDate",
+ Date_t::fromDurationSinceEpoch(
+ Seconds(it->getHeartbeatAppliedOpTime().getSecs())));
+ bb.appendDate("optimeDurableDate",
+ Date_t::fromDurationSinceEpoch(
+ Seconds(it->getHeartbeatDurableOpTime().getSecs())));
+ }
+ bb.appendDate("lastHeartbeat", it->getLastHeartbeat());
+ bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv());
+ Milliseconds ping = _getPing(itConfig.getHostAndPort());
+ if (ping != UninitializedPing) {
+ bb.append("pingMs", durationCount<Milliseconds>(ping));
+ std::string s = it->getLastHeartbeatMsg();
+ if (!s.empty())
+ bb.append("lastHeartbeatMessage", s);
+ }
+ if (it->hasAuthIssue()) {
+ bb.append("authenticated", false);
+ }
+ const HostAndPort& syncSource = it->getSyncSource();
+ if (!syncSource.empty() && !state.primary()) {
+ bb.append("syncingTo", syncSource.toString());
+ }
+
+ if (state == MemberState::RS_PRIMARY) {
+ bb.append("electionTime", it->getElectionTime());
+ bb.appendDate(
+ "electionDate",
+ Date_t::fromDurationSinceEpoch(Seconds(it->getElectionTime().getSecs())));
+ }
+ bb.appendIntOrLL("configVersion", it->getConfigVersion());
+ membersOut.push_back(bb.obj());
+ }
+ }
+
+ // sort members bson
+ sort(membersOut.begin(), membersOut.end(), SimpleBSONObjComparator::kInstance.makeLessThan());
+
+ response->append("set", _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : "");
+ response->append("date", now);
+ response->append("myState", myState.s);
+ response->append("term", _term);
+
+ // Add sync source info
+ if (!_syncSource.empty() && !myState.primary() && !myState.removed()) {
+ response->append("syncingTo", _syncSource.toString());
+ }
+
+ if (_rsConfig.isConfigServer()) {
+ response->append("configsvr", true);
+ }
+
+ response->append("heartbeatIntervalMillis",
+ durationCount<Milliseconds>(_rsConfig.getHeartbeatInterval()));
+
+ // New optimes, to hold them all.
+ BSONObjBuilder optimes;
+ _lastCommittedOpTime.append(&optimes, "lastCommittedOpTime");
+ if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) {
+ rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime");
+ }
+
+ appendOpTime(&optimes, "appliedOpTime", lastOpApplied, _rsConfig.getProtocolVersion());
+ appendOpTime(&optimes, "durableOpTime", lastOpDurable, _rsConfig.getProtocolVersion());
+ response->append("optimes", optimes.obj());
+
+ if (!initialSyncStatus.isEmpty()) {
+ response->append("initialSyncStatus", initialSyncStatus);
+ }
+
+ response->append("members", membersOut);
+ *result = Status::OK();
+}
+
+StatusWith<BSONObj> TopologyCoordinatorImpl::prepareReplSetUpdatePositionCommand(
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle,
+ OpTime currentCommittedSnapshotOpTime) const {
+ BSONObjBuilder cmdBuilder;
+ invariant(_rsConfig.isInitialized());
+ // Do not send updates if we have been removed from the config.
+ if (_selfIndex == -1) {
+ return Status(ErrorCodes::NodeNotFound,
+ "This node is not in the current replset configuration.");
+ }
+ cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
+ // Create an array containing objects each live member connected to us and for ourself.
+ BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes"));
+ for (const auto& memberData : _memberData) {
+ if (memberData.getLastAppliedOpTime().isNull()) {
+ // Don't include info on members we haven't heard from yet.
+ continue;
+ }
+ // Don't include members we think are down.
+ if (!memberData.isSelf() && memberData.lastUpdateStale()) {
+ continue;
+ }
+
+ BSONObjBuilder entry(arrayBuilder.subobjStart());
+ switch (commandStyle) {
+ case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle:
+ memberData.getLastDurableOpTime().append(
+ &entry, UpdatePositionArgs::kDurableOpTimeFieldName);
+ memberData.getLastAppliedOpTime().append(
+ &entry, UpdatePositionArgs::kAppliedOpTimeFieldName);
+ break;
+ case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle:
+ entry.append("_id", memberData.getRid());
+ if (_rsConfig.getProtocolVersion() == 1) {
+ memberData.getLastDurableOpTime().append(&entry, "optime");
+ } else {
+ entry.append("optime", memberData.getLastDurableOpTime().getTimestamp());
+ }
+ break;
+ }
+ entry.append(UpdatePositionArgs::kMemberIdFieldName, memberData.getMemberId());
+ entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
+ }
+ arrayBuilder.done();
+
+ // Add metadata to command. Old style parsing logic will reject the metadata.
+ if (commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) {
+ prepareReplSetMetadata(currentCommittedSnapshotOpTime)
+ .writeToMetadata(&cmdBuilder)
+ .transitional_ignore();
+ }
+ return cmdBuilder.obj();
+}
+
+void TopologyCoordinatorImpl::fillMemberData(BSONObjBuilder* result) {
+ BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress"));
+ {
+ for (const auto& memberData : _memberData) {
+ BSONObjBuilder entry(replicationProgress.subobjStart());
+ entry.append("rid", memberData.getRid());
+ const auto lastDurableOpTime = memberData.getLastDurableOpTime();
+ if (_rsConfig.getProtocolVersion() == 1) {
+ BSONObjBuilder opTime(entry.subobjStart("optime"));
+ opTime.append("ts", lastDurableOpTime.getTimestamp());
+ opTime.append("term", lastDurableOpTime.getTerm());
+ opTime.done();
+ } else {
+ entry.append("optime", lastDurableOpTime.getTimestamp());
+ }
+ entry.append("host", memberData.getHostAndPort().toString());
+ if (_selfIndex >= 0) {
+ const int memberId = memberData.getMemberId();
+ invariant(memberId >= 0);
+ entry.append("memberId", memberId);
+ }
+ }
+ }
+}
+
+void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) {
+ const MemberState myState = getMemberState();
+ if (!_rsConfig.isInitialized()) {
+ response->markAsNoConfig();
+ return;
+ }
+
+ for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
+ ++it) {
+ if (it->isHidden() || it->getSlaveDelay() > Seconds{0}) {
+ continue;
+ }
+
+ if (it->isElectable()) {
+ response->addHost(it->getHostAndPort());
+ } else if (it->isArbiter()) {
+ response->addArbiter(it->getHostAndPort());
+ } else {
+ response->addPassive(it->getHostAndPort());
+ }
+ }
+
+ response->setReplSetName(_rsConfig.getReplSetName());
+ if (myState.removed()) {
+ response->markAsNoConfig();
+ return;
+ }
+
+ response->setReplSetVersion(_rsConfig.getConfigVersion());
+ response->setIsMaster(myState.primary());
+ response->setIsSecondary(myState.secondary());
+
+ const MemberConfig* curPrimary = _currentPrimaryMember();
+ if (curPrimary) {
+ response->setPrimary(curPrimary->getHostAndPort());
+ }
+
+ const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
+ if (selfConfig.isArbiter()) {
+ response->setIsArbiterOnly(true);
+ } else if (selfConfig.getPriority() == 0) {
+ response->setIsPassive(true);
+ }
+ if (selfConfig.getSlaveDelay() > Seconds(0)) {
+ response->setSlaveDelay(selfConfig.getSlaveDelay());
+ }
+ if (selfConfig.isHidden()) {
+ response->setIsHidden(true);
+ }
+ if (!selfConfig.shouldBuildIndexes()) {
+ response->setShouldBuildIndexes(false);
+ }
+ const ReplSetTagConfig tagConfig = _rsConfig.getTagConfig();
+ if (selfConfig.hasTags(tagConfig)) {
+ for (MemberConfig::TagIterator tag = selfConfig.tagsBegin(); tag != selfConfig.tagsEnd();
+ ++tag) {
+ std::string tagKey = tagConfig.getTagKey(*tag);
+ if (tagKey[0] == '$') {
+ // Filter out internal tags
+ continue;
+ }
+ response->addTag(tagKey, tagConfig.getTagValue(*tag));
+ }
+ }
+ response->setMe(selfConfig.getHostAndPort());
+ if (_iAmPrimary()) {
+ response->setElectionId(_electionId);
+ }
+}
+
+StatusWith<TopologyCoordinatorImpl::PrepareFreezeResponseResult>
+TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) {
+ if (_role != TopologyCoordinator::Role::follower) {
+ std::string msg = str::stream()
+ << "cannot freeze node when primary or running for election. state: "
+ << (_role == TopologyCoordinator::Role::leader ? "Primary" : "Running-Election");
+ log() << msg;
+ return Status(ErrorCodes::NotSecondary, msg);
+ }
+
+ if (secs == 0) {
+ _stepDownUntil = now;
+ log() << "'unfreezing'";
+ response->append("info", "unfreezing");
+
+ if (_isElectableNodeInSingleNodeReplicaSet()) {
+ // If we are a one-node replica set, we're the one member,
+ // we're electable, we're not in maintenance mode, and we are currently in followerMode
+ // SECONDARY, we must transition to candidate now that our stepdown period
+ // is no longer active, in leiu of heartbeats.
+ _role = Role::candidate;
+ return PrepareFreezeResponseResult::kElectSelf;
+ }
+ } else {
+ if (secs == 1)
+ response->append("warning", "you really want to freeze for only 1 second?");
+
+ _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs));
+ log() << "'freezing' for " << secs << " seconds";
+ }
+
+ return PrepareFreezeResponseResult::kNoAction;
+}
+
+bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) {
+ if (_stepDownUntil > now) {
+ return false;
+ }
+
+ if (_isElectableNodeInSingleNodeReplicaSet()) {
+ // If the new config describes a one-node replica set, we're the one member,
+ // we're electable, we're not in maintenance mode, and we are currently in followerMode
+ // SECONDARY, we must transition to candidate, in leiu of heartbeats.
+ _role = Role::candidate;
+ return true;
+ }
+ return false;
+}
+
+void TopologyCoordinatorImpl::setElectionSleepUntil(Date_t newTime) {
+ if (_electionSleepUntil < newTime) {
+ _electionSleepUntil = newTime;
+ }
+}
+
+Timestamp TopologyCoordinatorImpl::getElectionTime() const {
+ return _electionTime;
+}
+
+OID TopologyCoordinatorImpl::getElectionId() const {
+ return _electionId;
+}
+
+int TopologyCoordinatorImpl::getCurrentPrimaryIndex() const {
+ return _currentPrimaryIndex;
+}
+
+Date_t TopologyCoordinatorImpl::getStepDownTime() const {
+ return _stepDownUntil;
+}
+
+void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig,
+ int selfIndex,
+ Date_t now) {
+ std::vector<MemberData> oldHeartbeats;
+ _memberData.swap(oldHeartbeats);
+
+ int index = 0;
+ for (ReplSetConfig::MemberIterator it = newConfig.membersBegin(); it != newConfig.membersEnd();
+ ++it, ++index) {
+ const MemberConfig& newMemberConfig = *it;
+ MemberData newHeartbeatData;
+ for (auto&& oldMemberData : oldHeartbeats) {
+ if ((oldMemberData.getMemberId() == newMemberConfig.getId() &&
+ oldMemberData.getHostAndPort() == newMemberConfig.getHostAndPort()) ||
+ (index == selfIndex && oldMemberData.isSelf())) {
+ // This member existed in the old config with the same member ID and
+ // HostAndPort, so copy its heartbeat data over.
+ newHeartbeatData = oldMemberData;
+ break;
+ }
+ }
+ newHeartbeatData.setConfigIndex(index);
+ newHeartbeatData.setIsSelf(index == selfIndex);
+ newHeartbeatData.setHostAndPort(newMemberConfig.getHostAndPort());
+ newHeartbeatData.setMemberId(newMemberConfig.getId());
+ _memberData.push_back(newHeartbeatData);
+ }
+ if (selfIndex < 0) {
+ // It's necessary to have self member data even if self isn't in the configuration.
+ // We don't need data for the other nodes (which no longer know about us, or soon won't)
+ _memberData.clear();
+ // We're not in the config, we can't sync any more.
+ _syncSource = HostAndPort();
+ MemberData newHeartbeatData;
+ for (auto&& oldMemberData : oldHeartbeats) {
+ if (oldMemberData.isSelf()) {
+ newHeartbeatData = oldMemberData;
+ break;
+ }
+ }
+ newHeartbeatData.setConfigIndex(-1);
+ newHeartbeatData.setIsSelf(true);
+ _memberData.push_back(newHeartbeatData);
+ }
+}
+
+// This function installs a new config object and recreates MemberData objects
+// that reflect the new config.
+void TopologyCoordinatorImpl::updateConfig(const ReplSetConfig& newConfig,
+ int selfIndex,
+ Date_t now) {
+ invariant(_role != Role::candidate);
+ invariant(selfIndex < newConfig.getNumMembers());
+
+ // Reset term on startup and upgrade/downgrade of protocol version.
+ if (!_rsConfig.isInitialized() ||
+ _rsConfig.getProtocolVersion() != newConfig.getProtocolVersion()) {
+ if (newConfig.getProtocolVersion() == 1) {
+ _term = OpTime::kInitialTerm;
+ } else {
+ invariant(newConfig.getProtocolVersion() == 0);
+ _term = OpTime::kUninitializedTerm;
+ }
+ LOG(1) << "Updated term in topology coordinator to " << _term << " due to new config";
+ }
+
+ _updateHeartbeatDataForReconfig(newConfig, selfIndex, now);
+ _rsConfig = newConfig;
+ _selfIndex = selfIndex;
+ _forceSyncSourceIndex = -1;
+
+ if (_role == Role::leader) {
+ if (_selfIndex == -1) {
+ log() << "Could not remain primary because no longer a member of the replica set";
+ } else if (!_selfConfig().isElectable()) {
+ log() << " Could not remain primary because no longer electable";
+ } else {
+ // Don't stepdown if you don't have to.
+ _currentPrimaryIndex = _selfIndex;
+ return;
+ }
+ _role = Role::follower;
+ _setLeaderMode(LeaderMode::kNotLeader);
+ }
+
+ // By this point we know we are in Role::follower
+ _currentPrimaryIndex = -1; // force secondaries to re-detect who the primary is
+
+ if (_isElectableNodeInSingleNodeReplicaSet()) {
+ // If the new config describes a one-node replica set, we're the one member,
+ // we're electable, we're not in maintenance mode and we are currently in followerMode
+ // SECONDARY, we must transition to candidate, in leiu of heartbeats.
+ _role = Role::candidate;
+ }
+}
+std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const {
+ // ignore messages over 2 minutes old
+ if ((now - _hbmsgTime) > Seconds{120}) {
+ return "";
+ }
+ return _hbmsg;
+}
+
+void TopologyCoordinatorImpl::setMyHeartbeatMessage(const Date_t now, const std::string& message) {
+ _hbmsgTime = now;
+ _hbmsg = message;
+}
+
+const MemberConfig& TopologyCoordinatorImpl::_selfConfig() const {
+ return _rsConfig.getMemberAt(_selfIndex);
+}
+
+const MemberData& TopologyCoordinatorImpl::_selfMemberData() const {
+ return _memberData[_selfMemberDataIndex()];
+}
+
+const int TopologyCoordinatorImpl::_selfMemberDataIndex() const {
+ invariant(!_memberData.empty());
+ if (_selfIndex >= 0)
+ return _selfIndex;
+ // In master-slave mode, the first entry is for self. If there is no config
+ // or we're not in the config, the first-and-only entry should be for self.
+ return 0;
+}
+
+TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnelectableReason(
+ int index) const {
+ invariant(index != _selfIndex);
+ const MemberConfig& memberConfig = _rsConfig.getMemberAt(index);
+ const MemberData& hbData = _memberData.at(index);
+ UnelectableReasonMask result = None;
+ if (memberConfig.isArbiter()) {
+ result |= ArbiterIAm;
+ }
+ if (memberConfig.getPriority() <= 0) {
+ result |= NoPriority;
+ }
+ if (hbData.getState() != MemberState::RS_SECONDARY) {
+ result |= NotSecondary;
+ }
+ if (_rsConfig.getProtocolVersion() == 0 &&
+ !_isOpTimeCloseEnoughToLatestToElect(hbData.getHeartbeatAppliedOpTime())) {
+ result |= NotCloseEnoughToLatestOptime;
+ }
+ if (hbData.up() && hbData.isUnelectable()) {
+ result |= RefusesToStand;
+ }
+ invariant(result || memberConfig.isElectable());
+ return result;
+}
+
+TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUnelectableReason(
+ const Date_t now, StartElectionReason reason) const {
+ UnelectableReasonMask result = None;
+ const OpTime lastApplied = getMyLastAppliedOpTime();
+ if (lastApplied.isNull()) {
+ result |= NoData;
+ }
+ if (!_aMajoritySeemsToBeUp()) {
+ result |= CannotSeeMajority;
+ }
+ if (_selfIndex == -1) {
+ result |= NotInitialized;
+ return result;
+ }
+ if (_selfConfig().isArbiter()) {
+ result |= ArbiterIAm;
+ }
+ if (_selfConfig().getPriority() <= 0) {
+ result |= NoPriority;
+ }
+ if (_stepDownUntil > now) {
+ result |= StepDownPeriodActive;
+ }
+
+ // Cannot be electable unless secondary or already primary
+ if (!getMemberState().secondary() && !_iAmPrimary()) {
+ result |= NotSecondary;
+ }
+
+ if (_rsConfig.getProtocolVersion() == 0) {
+ // Election rules only for protocol version 0.
+ if (_voteLease.whoId != -1 &&
+ _voteLease.whoId != _rsConfig.getMemberAt(_selfIndex).getId() &&
+ _voteLease.when + VoteLease::leaseTime >= now) {
+ result |= VotedTooRecently;
+ }
+ if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied)) {
+ result |= NotCloseEnoughToLatestOptime;
+ }
+ } else {
+ // Election rules only for protocol version 1.
+ invariant(_rsConfig.getProtocolVersion() == 1);
+ if (reason == StartElectionReason::kPriorityTakeover &&
+ !_amIFreshEnoughForPriorityTakeover()) {
+ result |= NotCloseEnoughToLatestForPriorityTakeover;
+ }
+
+ if (reason == StartElectionReason::kCatchupTakeover &&
+ !_amIFreshEnoughForCatchupTakeover()) {
+ result |= NotFreshEnoughForCatchupTakeover;
+ }
+ }
+ return result;
+}
+
+std::string TopologyCoordinatorImpl::_getUnelectableReasonString(
+ const UnelectableReasonMask ur) const {
+ invariant(ur);
+ str::stream ss;
+ bool hasWrittenToStream = false;
+ if (ur & NoData) {
+ ss << "node has no applied oplog entries";
+ hasWrittenToStream = true;
+ }
+ if (ur & VotedTooRecently) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "I recently voted for " << _voteLease.whoHostAndPort.toString();
+ }
+ if (ur & CannotSeeMajority) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "I cannot see a majority";
+ }
+ if (ur & ArbiterIAm) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "member is an arbiter";
+ }
+ if (ur & NoPriority) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "member has zero priority";
+ }
+ if (ur & StepDownPeriodActive) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "I am still waiting for stepdown period to end at "
+ << dateToISOStringLocal(_stepDownUntil);
+ }
+ if (ur & NotSecondary) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "member is not currently a secondary";
+ }
+ if (ur & NotCloseEnoughToLatestOptime) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "member is more than 10 seconds behind the most up-to-date member";
+ }
+ if (ur & NotCloseEnoughToLatestForPriorityTakeover) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "member is not caught up enough to the most up-to-date member to call for priority "
+ "takeover - must be within "
+ << priorityTakeoverFreshnessWindowSeconds << " seconds";
+ }
+ if (ur & NotFreshEnoughForCatchupTakeover) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "member is either not the most up-to-date member or not ahead of the primary, and "
+ "therefore cannot call for catchup takeover";
+ }
+ if (ur & NotInitialized) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "node is not a member of a valid replica set configuration";
+ }
+ if (ur & RefusesToStand) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "most recent heartbeat indicates node will not stand for election";
+ }
+ if (!hasWrittenToStream) {
+ severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur);
+ fassertFailed(26011);
+ }
+ ss << " (mask 0x" << integerToHex(ur) << ")";
+ return ss;
+}
+
+Milliseconds TopologyCoordinatorImpl::_getPing(const HostAndPort& host) {
+ return _pings[host].getMillis();
+}
+
+void TopologyCoordinatorImpl::_setElectionTime(const Timestamp& newElectionTime) {
+ _electionTime = newElectionTime;
+}
+
+int TopologyCoordinatorImpl::_getTotalPings() {
+ PingMap::iterator it = _pings.begin();
+ PingMap::iterator end = _pings.end();
+ int totalPings = 0;
+ while (it != end) {
+ totalPings += it->second.getCount();
+ it++;
+ }
+ return totalPings;
+}
+
+std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const {
+ std::vector<HostAndPort> upHosts;
+ for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_memberData, it);
+ if (itIndex == _selfIndex) {
+ continue; // skip ourselves
+ }
+ if (!it->maybeUp()) {
+ continue; // skip DOWN nodes
+ }
-TopologyCoordinator::Role::Role(int value) : _value(value) {}
+ upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort());
+ }
+ return upHosts;
+}
-std::string TopologyCoordinator::Role::toString() const {
- switch (_value) {
- case kLeaderValue:
- return "leader";
- case kFollowerValue:
- return "follower";
- case kCandidateValue:
- return "candidate";
+bool TopologyCoordinatorImpl::voteForMyself(Date_t now) {
+ if (_role != Role::candidate) {
+ return false;
}
- invariant(false);
+ int selfId = _selfConfig().getId();
+ if ((_voteLease.when + VoteLease::leaseTime >= now) && (_voteLease.whoId != selfId)) {
+ log() << "not voting yea for " << selfId << " voted for "
+ << _voteLease.whoHostAndPort.toString() << ' '
+ << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
+ return false;
+ }
+ _voteLease.when = now;
+ _voteLease.whoId = selfId;
+ _voteLease.whoHostAndPort = _selfConfig().getHostAndPort();
+ return true;
}
-TopologyCoordinator::~TopologyCoordinator() {}
+bool TopologyCoordinatorImpl::isSteppingDown() const {
+ return _leaderMode == LeaderMode::kAttemptingStepDown ||
+ _leaderMode == LeaderMode::kSteppingDown;
+}
-std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role) {
- return os << role.toString();
+void TopologyCoordinatorImpl::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) {
+ // Invariants for valid state transitions.
+ switch (_leaderMode) {
+ case LeaderMode::kNotLeader:
+ invariant(newMode == LeaderMode::kLeaderElect);
+ break;
+ case LeaderMode::kLeaderElect:
+ invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case
+ newMode == LeaderMode::kMaster ||
+ newMode == LeaderMode::kAttemptingStepDown ||
+ newMode == LeaderMode::kSteppingDown);
+ break;
+ case LeaderMode::kMaster:
+ invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case
+ newMode == LeaderMode::kAttemptingStepDown ||
+ newMode == LeaderMode::kSteppingDown);
+ break;
+ case LeaderMode::kAttemptingStepDown:
+ invariant(newMode == LeaderMode::kNotLeader || newMode == LeaderMode::kMaster ||
+ newMode == LeaderMode::kSteppingDown);
+ break;
+ case LeaderMode::kSteppingDown:
+ invariant(newMode == LeaderMode::kNotLeader);
+ break;
+ }
+ _leaderMode = std::move(newMode);
}
-std::ostream& operator<<(std::ostream& os,
- TopologyCoordinator::PrepareFreezeResponseResult result) {
- switch (result) {
- case TopologyCoordinator::PrepareFreezeResponseResult::kNoAction:
- return os << "no action";
- case TopologyCoordinator::PrepareFreezeResponseResult::kElectSelf:
- return os << "elect self";
+MemberState TopologyCoordinatorImpl::getMemberState() const {
+ if (_selfIndex == -1) {
+ if (_rsConfig.isInitialized()) {
+ return MemberState::RS_REMOVED;
+ }
+ return MemberState::RS_STARTUP;
+ }
+
+ if (_rsConfig.isConfigServer()) {
+ if (_options.clusterRole != ClusterRole::ConfigServer) {
+ return MemberState::RS_REMOVED;
+ } else {
+ invariant(_storageEngineSupportsReadCommitted != ReadCommittedSupport::kUnknown);
+ if (_storageEngineSupportsReadCommitted == ReadCommittedSupport::kNo) {
+ return MemberState::RS_REMOVED;
+ }
+ }
+ } else {
+ if (_options.clusterRole == ClusterRole::ConfigServer) {
+ return MemberState::RS_REMOVED;
+ }
+ }
+
+ if (_role == Role::leader) {
+ invariant(_currentPrimaryIndex == _selfIndex);
+ invariant(_leaderMode != LeaderMode::kNotLeader);
+ return MemberState::RS_PRIMARY;
+ }
+ const MemberConfig& myConfig = _selfConfig();
+ if (myConfig.isArbiter()) {
+ return MemberState::RS_ARBITER;
+ }
+ if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_memberData, _selfIndex))) &&
+ (_followerMode == MemberState::RS_SECONDARY)) {
+ return MemberState::RS_RECOVERING;
+ }
+ return _followerMode;
+}
+
+bool TopologyCoordinatorImpl::canAcceptWrites() const {
+ return _leaderMode == LeaderMode::kMaster;
+}
+
+void TopologyCoordinatorImpl::setElectionInfo(OID electionId, Timestamp electionOpTime) {
+ invariant(_role == Role::leader);
+ _electionTime = electionOpTime;
+ _electionId = electionId;
+}
+
+void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp electionOpTime) {
+ invariant(_role == Role::candidate);
+ invariant(_leaderMode == LeaderMode::kNotLeader);
+ _role = Role::leader;
+ _setLeaderMode(LeaderMode::kLeaderElect);
+ setElectionInfo(electionId, electionOpTime);
+ _currentPrimaryIndex = _selfIndex;
+ _syncSource = HostAndPort();
+ _forceSyncSourceIndex = -1;
+ // Prevent last committed optime from updating until we finish draining.
+ _firstOpTimeOfMyTerm =
+ OpTime(Timestamp(std::numeric_limits<int>::max(), 0), std::numeric_limits<int>::max());
+}
+
+void TopologyCoordinatorImpl::processLoseElection() {
+ invariant(_role == Role::candidate);
+ invariant(_leaderMode == LeaderMode::kNotLeader);
+ const HostAndPort syncSourceAddress = getSyncSourceAddress();
+ _electionTime = Timestamp(0, 0);
+ _electionId = OID();
+ _role = Role::follower;
+
+ // Clear voteLease time, if we voted for ourselves in this election.
+ // This will allow us to vote for others.
+ if (_voteLease.whoId == _selfConfig().getId()) {
+ _voteLease.when = Date_t();
+ }
+}
+
+bool TopologyCoordinatorImpl::attemptStepDown(
+ long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) {
+
+ if (_role != Role::leader || _leaderMode == LeaderMode::kSteppingDown || _term != termAtStart) {
+ uasserted(ErrorCodes::PrimarySteppedDown,
+ "While waiting for secondaries to catch up before stepping down, "
+ "this node decided to step down for other reasons");
+ }
+ invariant(_leaderMode == LeaderMode::kAttemptingStepDown);
+
+ if (now >= stepDownUntil) {
+ uasserted(ErrorCodes::ExceededTimeLimit,
+ "By the time we were ready to step down, we were already past the "
+ "time we were supposed to step down until");
+ }
+
+ if (!_canCompleteStepDownAttempt(now, waitUntil, force)) {
+ // Stepdown attempt failed.
+
+ // Check waitUntil after at least one stepdown attempt, so that stepdown could succeed even
+ // if secondaryCatchUpPeriodSecs == 0.
+ if (now >= waitUntil) {
+ uasserted(ErrorCodes::ExceededTimeLimit,
+ str::stream() << "No electable secondaries caught up as of "
+ << dateToISOStringLocal(now)
+ << "Please use the replSetStepDown command with the argument "
+ << "{force: true} to force node to step down.");
+ }
+
+ // Stepdown attempt failed, but in a way that can be retried
+ return false;
+ }
+
+ // Stepdown attempt success!
+ _stepDownUntil = stepDownUntil;
+ _stepDownSelfAndReplaceWith(-1);
+ return true;
+}
+
+bool TopologyCoordinatorImpl::_canCompleteStepDownAttempt(Date_t now,
+ Date_t waitUntil,
+ bool force) {
+ const bool forceNow = force && (now >= waitUntil);
+ if (forceNow) {
+ return true;
+ }
+
+ return isSafeToStepDown();
+}
+
+bool TopologyCoordinatorImpl::isSafeToStepDown() {
+ if (!_rsConfig.isInitialized() || _selfIndex < 0) {
+ return false;
+ }
+
+ OpTime lastApplied = getMyLastAppliedOpTime();
+
+ auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName);
+ invariant(tagStatus.isOK());
+
+ // Check if a majority of nodes have reached the last applied optime.
+ if (!haveTaggedNodesReachedOpTime(lastApplied, tagStatus.getValue(), false)) {
+ return false;
+ }
+
+ // Now check that we also have at least one caught up node that is electable.
+ const OpTime lastOpApplied = getMyLastAppliedOpTime();
+ for (int memberIndex = 0; memberIndex < _rsConfig.getNumMembers(); memberIndex++) {
+ // ignore your self
+ if (memberIndex == _selfIndex) {
+ continue;
+ }
+ UnelectableReasonMask reason = _getUnelectableReason(memberIndex);
+ if (!reason && _memberData.at(memberIndex).getHeartbeatAppliedOpTime() >= lastOpApplied) {
+ // Found a caught up and electable node, succeed with step down.
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) {
+ invariant(_role == Role::follower);
+ switch (newMode) {
+ case MemberState::RS_RECOVERING:
+ case MemberState::RS_ROLLBACK:
+ case MemberState::RS_SECONDARY:
+ case MemberState::RS_STARTUP2:
+ _followerMode = newMode;
+ break;
+ default:
+ invariant(false);
+ }
+
+ if (_followerMode != MemberState::RS_SECONDARY) {
+ return;
+ }
+
+ // When a single node replica set transitions to SECONDARY, we must check if we should
+ // be a candidate here. This is necessary because a single node replica set has no
+ // heartbeats that would normally change the role to candidate.
+
+ if (_isElectableNodeInSingleNodeReplicaSet()) {
+ _role = Role::candidate;
+ }
+}
+
+bool TopologyCoordinatorImpl::_isElectableNodeInSingleNodeReplicaSet() const {
+ return _followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 &&
+ _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable() &&
+ _maintenanceModeCalls == 0;
+}
+
+void TopologyCoordinatorImpl::finishUnconditionalStepDown() {
+ invariant(_leaderMode == LeaderMode::kSteppingDown);
+
+ int remotePrimaryIndex = -1;
+ for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_memberData, it);
+ if (itIndex == _selfIndex) {
+ continue;
+ }
+
+ if (it->getState().primary() && it->up()) {
+ if (remotePrimaryIndex != -1) {
+ // two other nodes think they are primary (asynchronously polled)
+ // -- wait for things to settle down.
+ remotePrimaryIndex = -1;
+ warning() << "two remote primaries (transiently)";
+ break;
+ }
+ remotePrimaryIndex = itIndex;
+ }
+ }
+ _stepDownSelfAndReplaceWith(remotePrimaryIndex);
+}
+
+void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) {
+ invariant(_role == Role::leader);
+ invariant(_selfIndex != -1);
+ invariant(_selfIndex != newPrimary);
+ invariant(_selfIndex == _currentPrimaryIndex);
+ _currentPrimaryIndex = newPrimary;
+ _role = Role::follower;
+ _setLeaderMode(LeaderMode::kNotLeader);
+}
+
+bool TopologyCoordinatorImpl::updateLastCommittedOpTime() {
+ // If we're not primary or we're stepping down due to learning of a new term then we must not
+ // advance the commit point. If we are stepping down due to a user request, however, then it
+ // is safe to advance the commit point, and in fact we must since the stepdown request may be
+ // waiting for the commit point to advance enough to be able to safely complete the step down.
+ if (!_iAmPrimary() || _leaderMode == LeaderMode::kSteppingDown) {
+ return false;
+ }
+
+ // Whether we use the applied or durable OpTime for the commit point is decided here.
+ const bool useDurableOpTime = _rsConfig.getWriteConcernMajorityShouldJournal();
+
+ std::vector<OpTime> votingNodesOpTimes;
+ for (const auto& memberData : _memberData) {
+ int memberIndex = memberData.getConfigIndex();
+ invariant(memberIndex >= 0);
+ const auto& memberConfig = _rsConfig.getMemberAt(memberIndex);
+ if (memberConfig.isVoter()) {
+ const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime()
+ : memberData.getLastAppliedOpTime();
+ votingNodesOpTimes.push_back(opTime);
+ }
+ }
+
+ invariant(votingNodesOpTimes.size() > 0);
+ if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) {
+ return false;
+ }
+ std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
+
+ // need the majority to have this OpTime
+ OpTime committedOpTime =
+ votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
+ return advanceLastCommittedOpTime(committedOpTime);
+}
+
+bool TopologyCoordinatorImpl::advanceLastCommittedOpTime(const OpTime& committedOpTime) {
+ if (committedOpTime == _lastCommittedOpTime) {
+ return false; // Hasn't changed, so ignore it.
+ } else if (committedOpTime < _lastCommittedOpTime) {
+ LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime
+ << ", currentCommittedOpTime: " << _lastCommittedOpTime;
+ return false; // This may have come from an out-of-order heartbeat. Ignore it.
+ }
+
+ // This check is performed to ensure primaries do not commit an OpTime from a previous term.
+ if (_iAmPrimary() && committedOpTime < _firstOpTimeOfMyTerm) {
+ LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: "
+ << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm;
+ return false;
+ }
+
+ LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime;
+ _lastCommittedOpTime = committedOpTime;
+ return true;
+}
+
+OpTime TopologyCoordinatorImpl::getLastCommittedOpTime() const {
+ return _lastCommittedOpTime;
+}
+
+bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary(
+ long long termWhenDrainCompleted) const {
+
+ if (termWhenDrainCompleted != _term) {
+ return false;
+ }
+ // Allow completing the transition to primary even when in the middle of a stepdown attempt,
+ // in case the stepdown attempt fails.
+ if (_leaderMode != LeaderMode::kLeaderElect && _leaderMode != LeaderMode::kAttemptingStepDown) {
+ return false;
+ }
+
+ return true;
+}
+
+Status TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) {
+ if (!canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())) {
+ return Status(ErrorCodes::PrimarySteppedDown,
+ "By the time this node was ready to complete its transition to PRIMARY it "
+ "was no longer eligible to do so");
+ }
+ if (_leaderMode == LeaderMode::kLeaderElect) {
+ _setLeaderMode(LeaderMode::kMaster);
+ }
+ _firstOpTimeOfMyTerm = firstOpTimeOfTerm;
+ return Status::OK();
+}
+
+void TopologyCoordinatorImpl::adjustMaintenanceCountBy(int inc) {
+ invariant(_role == Role::follower);
+ _maintenanceModeCalls += inc;
+ invariant(_maintenanceModeCalls >= 0);
+}
+
+int TopologyCoordinatorImpl::getMaintenanceCount() const {
+ return _maintenanceModeCalls;
+}
+
+TopologyCoordinator::UpdateTermResult TopologyCoordinatorImpl::updateTerm(long long term,
+ Date_t now) {
+ if (term <= _term) {
+ return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate;
+ }
+ // Don't run election if we just stood up or learned about a new term.
+ _electionSleepUntil = now + _rsConfig.getElectionTimeoutPeriod();
+
+ // Don't update the term just yet if we are going to step down, as we don't want to report
+ // that we are primary in the new term.
+ if (_iAmPrimary()) {
+ return TopologyCoordinator::UpdateTermResult::kTriggerStepDown;
+ }
+ LOG(1) << "Updating term from " << _term << " to " << term;
+ _term = term;
+ return TopologyCoordinator::UpdateTermResult::kUpdatedTerm;
+}
+
+
+long long TopologyCoordinatorImpl::getTerm() {
+ return _term;
+}
+
+// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the
+// replset. Passing metadata is unnecessary.
+bool TopologyCoordinatorImpl::shouldChangeSyncSource(
+ const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata,
+ Date_t now) const {
+ // Methodology:
+ // If there exists a viable sync source member other than currentSource, whose oplog has
+ // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's,
+ // return true.
+ // If the currentSource has the same replication progress as we do and has no source for further
+ // progress, return true.
+
+ if (_selfIndex == -1) {
+ log() << "Not choosing new sync source because we are not in the config.";
+ return false;
+ }
+
+ // If the user requested a sync source change, return true.
+ if (_forceSyncSourceIndex != -1) {
+ log() << "Choosing new sync source because the user has requested to use "
+ << _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort()
+ << " as a sync source";
+ return true;
+ }
+
+ if (_rsConfig.getProtocolVersion() == 1 &&
+ replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
+ log() << "Choosing new sync source because the config version supplied by " << currentSource
+ << ", " << replMetadata.getConfigVersion() << ", does not match ours, "
+ << _rsConfig.getConfigVersion();
+ return true;
+ }
+
+ const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource);
+ // PV0 doesn't use metadata, we have to consult _rsConfig.
+ if (currentSourceIndex == -1) {
+ log() << "Choosing new sync source because " << currentSource.toString()
+ << " is not in our config";
+ return true;
+ }
+
+ invariant(currentSourceIndex != _selfIndex);
+
+ // If OplogQueryMetadata was provided, use its values, otherwise use the ones in
+ // ReplSetMetadata.
+ OpTime currentSourceOpTime;
+ int syncSourceIndex = -1;
+ int primaryIndex = -1;
+ if (oqMetadata) {
+ currentSourceOpTime =
+ std::max(oqMetadata->getLastOpApplied(),
+ _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
+ syncSourceIndex = oqMetadata->getSyncSourceIndex();
+ primaryIndex = oqMetadata->getPrimaryIndex();
+ } else {
+ currentSourceOpTime =
+ std::max(replMetadata.getLastOpVisible(),
+ _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
+ syncSourceIndex = replMetadata.getSyncSourceIndex();
+ primaryIndex = replMetadata.getPrimaryIndex();
+ }
+
+ if (currentSourceOpTime.isNull()) {
+ // Haven't received a heartbeat from the sync source yet, so can't tell if we should
+ // change.
+ return false;
+ }
+
+ // Change sync source if they are not ahead of us, and don't have a sync source,
+ // unless they are primary.
+ const OpTime myLastOpTime = getMyLastAppliedOpTime();
+ if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 &&
+ currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) {
+ std::stringstream logMessage;
+ logMessage << "Choosing new sync source because our current sync source, "
+ << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime
+ << ") which is not ahead of ours (" << myLastOpTime
+ << "), it does not have a sync source, and it's not the primary";
+ if (primaryIndex >= 0) {
+ logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)";
+ } else {
+ logMessage << " (sync source does not know the primary)";
+ }
+ log() << logMessage.str();
+ return true;
+ }
+
+ if (MONGO_FAIL_POINT(disableMaxSyncSourceLagSecs)) {
+ log() << "disableMaxSyncSourceLagSecs fail point enabled - not checking the most recent "
+ "OpTime, "
+ << currentSourceOpTime.toString() << ", of our current sync source, " << currentSource
+ << ", against the OpTimes of the other nodes in this replica set.";
+ } else {
+ unsigned int currentSecs = currentSourceOpTime.getSecs();
+ unsigned int goalSecs = currentSecs + durationCount<Seconds>(_options.maxSyncSourceLagSecs);
+
+ for (std::vector<MemberData>::const_iterator it = _memberData.begin();
+ it != _memberData.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_memberData, it);
+ const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex);
+ if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) &&
+ (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) &&
+ it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) &&
+ goalSecs < it->getHeartbeatAppliedOpTime().getSecs()) {
+ log() << "Choosing new sync source because the most recent OpTime of our sync "
+ "source, "
+ << currentSource << ", is " << currentSourceOpTime.toString()
+ << " which is more than " << _options.maxSyncSourceLagSecs
+ << " behind member " << candidateConfig.getHostAndPort().toString()
+ << " whose most recent OpTime is "
+ << it->getHeartbeatAppliedOpTime().toString();
+ invariant(itIndex != _selfIndex);
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata(
+ const OpTime& lastVisibleOpTime) const {
+ return rpc::ReplSetMetadata(_term,
+ _lastCommittedOpTime,
+ lastVisibleOpTime,
+ _rsConfig.getConfigVersion(),
+ _rsConfig.getReplicaSetId(),
+ _currentPrimaryIndex,
+ _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
+}
+
+rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata(int rbid) const {
+ return rpc::OplogQueryMetadata(_lastCommittedOpTime,
+ getMyLastAppliedOpTime(),
+ rbid,
+ _currentPrimaryIndex,
+ _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
+}
+
+void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
+ output->setConfig(_rsConfig);
+ output->setHBData(_memberData);
+ output->setSelfIndex(_selfIndex);
+ output->setPrimaryIndex(_currentPrimaryIndex);
+ output->setSelfState(getMemberState());
+ output->setSelfHeartbeatMessage(_hbmsg);
+}
+
+void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
+ ReplSetRequestVotesResponse* response) {
+ response->setTerm(_term);
+
+ if (args.getTerm() < _term) {
+ response->setVoteGranted(false);
+ response->setReason(str::stream() << "candidate's term (" << args.getTerm()
+ << ") is lower than mine ("
+ << _term
+ << ")");
+ } else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) {
+ response->setVoteGranted(false);
+ response->setReason(str::stream() << "candidate's config version ("
+ << args.getConfigVersion()
+ << ") differs from mine ("
+ << _rsConfig.getConfigVersion()
+ << ")");
+ } else if (args.getSetName() != _rsConfig.getReplSetName()) {
+ response->setVoteGranted(false);
+ response->setReason(str::stream() << "candidate's set name (" << args.getSetName()
+ << ") differs from mine ("
+ << _rsConfig.getReplSetName()
+ << ")");
+ } else if (args.getLastDurableOpTime() < getMyLastAppliedOpTime()) {
+ response->setVoteGranted(false);
+ response
+ ->setReason(str::stream()
+ << "candidate's data is staler than mine. candidate's last applied OpTime: "
+ << args.getLastDurableOpTime().toString()
+ << ", my last applied OpTime: "
+ << getMyLastAppliedOpTime().toString());
+ } else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) {
+ response->setVoteGranted(false);
+ response->setReason(str::stream()
+ << "already voted for another candidate ("
+ << _rsConfig.getMemberAt(_lastVote.getCandidateIndex()).getHostAndPort()
+ << ") this term ("
+ << _lastVote.getTerm()
+ << ")");
+ } else {
+ int betterPrimary = _findHealthyPrimaryOfEqualOrGreaterPriority(args.getCandidateIndex());
+ if (_selfConfig().isArbiter() && betterPrimary >= 0) {
+ response->setVoteGranted(false);
+ response->setReason(str::stream()
+ << "can see a healthy primary ("
+ << _rsConfig.getMemberAt(betterPrimary).getHostAndPort()
+ << ") of equal or greater priority");
+ } else {
+ if (!args.isADryRun()) {
+ _lastVote.setTerm(args.getTerm());
+ _lastVote.setCandidateIndex(args.getCandidateIndex());
+ }
+ response->setVoteGranted(true);
+ }
+ }
+}
+
+void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) {
+ _lastVote = lastVote;
+}
+
+void TopologyCoordinatorImpl::voteForMyselfV1() {
+ _lastVote.setTerm(_term);
+ _lastVote.setCandidateIndex(_selfIndex);
+}
+
+void TopologyCoordinatorImpl::setPrimaryIndex(long long primaryIndex) {
+ _currentPrimaryIndex = primaryIndex;
+}
+
+Status TopologyCoordinatorImpl::becomeCandidateIfElectable(const Date_t now,
+ StartElectionReason reason) {
+ if (_role == Role::leader) {
+ return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"};
+ }
+
+ if (_role == Role::candidate) {
+ return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"};
+ }
+
+ const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, reason);
+ if (unelectableReason) {
+ return {ErrorCodes::NodeNotElectable,
+ str::stream() << "Not standing for election because "
+ << _getUnelectableReasonString(unelectableReason)};
+ }
+
+ // All checks passed, become a candidate and start election proceedings.
+ _role = Role::candidate;
+
+ return Status::OK();
+}
+
+void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool supported) {
+ _storageEngineSupportsReadCommitted =
+ supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo;
+}
+
+void TopologyCoordinatorImpl::restartHeartbeats() {
+ for (auto& hb : _memberData) {
+ hb.restart();
+ }
+}
+
+boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const {
+ // The smallest OpTime in PV1.
+ OpTime latest(Timestamp(0, 0), 0);
+ for (size_t i = 0; i < _memberData.size(); i++) {
+ auto& peer = _memberData[i];
+
+ if (static_cast<int>(i) == _selfIndex) {
+ continue;
+ }
+ // If any heartbeat is not fresh enough, return none.
+ if (!peer.isUpdatedSinceRestart()) {
+ return boost::none;
+ }
+ // Ignore down members
+ if (!peer.up()) {
+ continue;
+ }
+ if (peer.getHeartbeatAppliedOpTime() > latest) {
+ latest = peer.getHeartbeatAppliedOpTime();
+ }
}
- MONGO_UNREACHABLE;
+ return latest;
}
} // namespace repl
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index ed00fecd31f..046ff2217a6 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -28,737 +28,502 @@
#pragma once
-#include <iosfwd>
#include <string>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/db/repl/repl_set_heartbeat_response.h"
+#include <vector>
+
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/repl/last_vote.h"
+#include "mongo/db/repl/member_data.h"
+#include "mongo/db/repl/member_state.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/util/net/hostandport.h"
+#include "mongo/db/repl/topology_coordinator.h"
+#include "mongo/db/server_options.h"
#include "mongo/util/time_support.h"
namespace mongo {
-class Timestamp;
+class OperationContext;
+
+namespace rpc {
+class ReplSetMetadata;
+} // namespace rpc
namespace repl {
-class HeartbeatResponseAction;
-class MemberData;
-class OpTime;
-class ReplSetHeartbeatArgs;
-class ReplSetConfig;
-class TagSubgroup;
-class LastVote;
-struct MemberState;
+static const Milliseconds UninitializedPing{-1};
/**
- * Replication Topology Coordinator interface.
+ * Represents a latency measurement for each replica set member based on heartbeat requests.
+ * The measurement is an average weighted 80% to the old value, and 20% to the new value.
*
- * This object is responsible for managing the topology of the cluster.
- * Tasks include consensus and leader election, chaining, and configuration management.
- * Methods of this class should be non-blocking.
+ * Also stores information about heartbeat progress and retries.
*/
-class TopologyCoordinator {
- MONGO_DISALLOW_COPYING(TopologyCoordinator);
-
+class PingStats {
public:
- class Role;
-
- virtual ~TopologyCoordinator();
-
/**
- * Different modes a node can be in while still reporting itself as in state PRIMARY.
- *
- * Valid transitions:
- *
- * kNotLeader <----------------------------------
- * | |
- * | |
- * | |
- * v |
- * kLeaderElect----- |
- * | | |
- * | | |
- * v | |
- * kMaster ------------------------- |
- * | ^ | | |
- * | | ------------------- | |
- * | | | | | |
- * v | v v v |
- * kAttemptingStepDown----------->kSteppingDown |
- * | | |
- * | | |
- * | | |
- * ---------------------------------------------
+ * Records that a new heartbeat request started at "now".
*
+ * This resets the failure count used in determining whether the next request to a target
+ * should be a retry or a regularly scheduled heartbeat message.
*/
- enum class LeaderMode {
- kNotLeader, // This node is not currently a leader.
- kLeaderElect, // This node has been elected leader, but can't yet accept writes.
- kMaster, // This node reports ismaster:true and can accept writes.
- kSteppingDown, // This node is in the middle of a (hb) stepdown that must complete.
- kAttemptingStepDown, // This node is in the middle of a stepdown (cmd) that might fail.
- };
-
- ////////////////////////////////////////////////////////////
- //
- // State inspection methods.
- //
- ////////////////////////////////////////////////////////////
+ void start(Date_t now);
/**
- * Gets the role of this member in the replication protocol.
+ * Records that a heartbeat request completed successfully, and that "millis" milliseconds
+ * were spent for a single network roundtrip plus remote processing time.
*/
- virtual Role getRole() const = 0;
+ void hit(Milliseconds millis);
/**
- * Gets the MemberState of this member in the replica set.
+ * Records that a heartbeat request failed.
*/
- virtual MemberState getMemberState() const = 0;
+ void miss();
/**
- * Returns whether this node should be allowed to accept writes.
+ * Gets the number of hit() calls.
*/
- virtual bool canAcceptWrites() const = 0;
+ unsigned int getCount() const {
+ return count;
+ }
/**
- * Returns true if this node is in the process of stepping down. Note that this can be
- * due to an unconditional stepdown that must succeed (for instance from learning about a new
- * term) or due to a stepdown attempt that could fail (for instance from a stepdown cmd that
- * could fail if not enough nodes are caught up).
+ * Gets the weighted average round trip time for heartbeat messages to the target.
+ * Returns 0 if there have been no pings recorded yet.
*/
- virtual bool isSteppingDown() const = 0;
+ Milliseconds getMillis() const {
+ return value == UninitializedPing ? Milliseconds(0) : value;
+ }
/**
- * Returns the address of the current sync source, or an empty HostAndPort if there is no
- * current sync source.
+ * Gets the date at which start() was last called, which is used to determine if
+ * a heartbeat should be retried or if the time limit has expired.
*/
- virtual HostAndPort getSyncSourceAddress() const = 0;
+ Date_t getLastHeartbeatStartDate() const {
+ return _lastHeartbeatStartDate;
+ }
/**
- * Retrieves a vector of HostAndPorts containing all nodes that are neither DOWN nor
- * ourself.
+ * Gets the number of failures since start() was last called.
+ *
+ * This value is incremented by calls to miss(), cleared by calls to start() and
+ * set to the maximum possible value by calls to hit().
*/
- virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const = 0;
+ int getNumFailuresSinceLastStart() const {
+ return _numFailuresSinceLastStart;
+ }
- /**
- * Gets the earliest time the current node will stand for election.
- */
- virtual Date_t getStepDownTime() const = 0;
+private:
+ unsigned int count = 0;
+ Milliseconds value = UninitializedPing;
+ Date_t _lastHeartbeatStartDate;
+ int _numFailuresSinceLastStart = std::numeric_limits<int>::max();
+};
- /**
- * Gets the current value of the maintenance mode counter.
- */
- virtual int getMaintenanceCount() const = 0;
+class TopologyCoordinatorImpl : public TopologyCoordinator {
+public:
+ struct Options {
+ // A sync source is re-evaluated after it lags behind further than this amount.
+ Seconds maxSyncSourceLagSecs{0};
- /**
- * Gets the latest term this member is aware of. If this member is the primary,
- * it's the current term of the replica set.
- */
- virtual long long getTerm() = 0;
+ // Whether or not this node is running as a config server.
+ ClusterRole clusterRole{ClusterRole::None};
+ };
- enum class UpdateTermResult { kAlreadyUpToDate, kTriggerStepDown, kUpdatedTerm };
+ /**
+ * Constructs a Topology Coordinator object.
+ **/
+ TopologyCoordinatorImpl(Options options);
////////////////////////////////////////////////////////////
//
- // Basic state manipulation methods.
+ // Implementation of TopologyCoordinator interface
//
////////////////////////////////////////////////////////////
- /**
- * Sets the latest term this member is aware of to the higher of its current value and
- * the value passed in as "term".
- * Returns the result of setting the term value, or if a stepdown should be triggered.
- */
- virtual UpdateTermResult updateTerm(long long term, Date_t now) = 0;
-
- /**
- * Sets the index into the config used when we next choose a sync source
- */
- virtual void setForceSyncSourceIndex(int index) = 0;
-
- enum class ChainingPreference { kAllowChaining, kUseConfiguration };
-
- /**
- * Chooses and sets a new sync source, based on our current knowledge of the world.
- */
+ virtual Role getRole() const;
+ virtual MemberState getMemberState() const;
+ virtual bool canAcceptWrites() const override;
+ virtual bool isSteppingDown() const override;
+ virtual HostAndPort getSyncSourceAddress() const;
+ virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const;
+ virtual int getMaintenanceCount() const;
+ virtual long long getTerm();
+ virtual UpdateTermResult updateTerm(long long term, Date_t now);
+ virtual void setForceSyncSourceIndex(int index);
virtual HostAndPort chooseNewSyncSource(Date_t now,
const OpTime& lastOpTimeFetched,
- ChainingPreference chainingPreference) = 0;
-
- /**
- * Suppresses selecting "host" as sync source until "until".
- */
- virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0;
-
- /**
- * Removes a single entry "host" from the list of potential sync sources which we
- * have blacklisted, if it is supposed to be unblacklisted by "now".
- */
- virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now) = 0;
-
- /**
- * Clears the list of potential sync sources we have blacklisted.
- */
- virtual void clearSyncSourceBlacklist() = 0;
-
- /**
- * Determines if a new sync source should be chosen, if a better candidate sync source is
- * available. If the current sync source's last optime ("syncSourceLastOpTime" under
- * protocolVersion 1, but pulled from the MemberData in protocolVersion 0) is more than
- * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are
- * running in ProtocolVersion 1, our current sync source is not primary, has no sync source
- * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true.
- *
- * "now" is used to skip over currently blacklisted sync sources.
- *
- * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8.
- */
+ ChainingPreference chainingPreference) override;
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
+ virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now);
+ virtual void clearSyncSourceBlacklist();
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const rpc::ReplSetMetadata& replMetadata,
boost::optional<rpc::OplogQueryMetadata> oqMetadata,
- Date_t now) const = 0;
-
- /**
- * Checks whether we are a single node set and we are not in a stepdown period. If so,
- * puts us into candidate mode, otherwise does nothing. This is used to ensure that
- * nodes in a single node replset become primary again when their stepdown period ends.
- */
- virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) = 0;
-
- /**
- * Sets the earliest time the current node will stand for election to "newTime".
- *
- * Until this time, while the node may report itself as electable, it will not stand
- * for election.
- */
- virtual void setElectionSleepUntil(Date_t newTime) = 0;
-
- /**
- * Sets the reported mode of this node to one of RS_SECONDARY, RS_STARTUP2, RS_ROLLBACK or
- * RS_RECOVERING, when getRole() == Role::follower. This is the interface by which the
- * applier changes the reported member state of the current node, and enables or suppresses
- * electability of the current node. All modes but RS_SECONDARY indicate an unelectable
- * follower state (one that cannot transition to candidate).
- */
- virtual void setFollowerMode(MemberState::MS newMode) = 0;
-
- /**
- * Scan the memberData and determine the highest last applied or last
- * durable optime present on a majority of servers; set _lastCommittedOpTime to this
- * new entry.
- * Whether the last applied or last durable op time is used depends on whether
- * the config getWriteConcernMajorityShouldJournal is set.
- * Returns true if the _lastCommittedOpTime was changed.
- */
- virtual bool updateLastCommittedOpTime() = 0;
-
- /**
- * Updates _lastCommittedOpTime to be "committedOpTime" if it is more recent than the
- * current last committed OpTime. Returns true if _lastCommittedOpTime is changed.
- */
- virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime) = 0;
-
- /**
- * Returns the OpTime of the latest majority-committed op known to this server.
- */
- virtual OpTime getLastCommittedOpTime() const = 0;
-
- /**
- * Returns true if it's safe to transition to LeaderMode::kMaster.
- */
- virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const = 0;
-
- /**
- * Called by the ReplicationCoordinator to signal that we have finished catchup and drain modes
- * and are ready to fully become primary and start accepting writes.
- * "firstOpTimeOfTerm" is a floor on the OpTimes this node will be allowed to consider committed
- * for this tenure as primary. This prevents entries from before our election from counting as
- * committed in our view, until our election (the "firstOpTimeOfTerm" op) has been committed.
- * Returns PrimarySteppedDown if this node is no longer eligible to begin accepting writes.
- */
- virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) = 0;
-
- /**
- * Adjusts the maintenance mode count by "inc".
- *
- * It is an error to call this method if getRole() does not return Role::follower.
- * It is an error to allow the maintenance count to go negative.
- */
- virtual void adjustMaintenanceCountBy(int inc) = 0;
-
- ////////////////////////////////////////////////////////////
- //
- // Methods that prepare responses to command requests.
- //
- ////////////////////////////////////////////////////////////
-
- // produces a reply to a replSetSyncFrom command
+ Date_t now) const;
+ virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
+ virtual void setElectionSleepUntil(Date_t newTime);
+ virtual void setFollowerMode(MemberState::MS newMode);
+ virtual bool updateLastCommittedOpTime();
+ virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime);
+ virtual OpTime getLastCommittedOpTime() const;
+ virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override;
+ virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) override;
+ virtual void adjustMaintenanceCountBy(int inc);
virtual void prepareSyncFromResponse(const HostAndPort& target,
BSONObjBuilder* response,
- Status* result) = 0;
-
- // produce a reply to a replSetFresh command
+ Status* result);
virtual void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args,
Date_t now,
BSONObjBuilder* response,
- Status* result) = 0;
-
- // produce a reply to a received electCmd
+ Status* result);
virtual void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args,
Date_t now,
BSONObjBuilder* response,
- Status* result) = 0;
-
- // produce a reply to a heartbeat
+ Status* result);
virtual Status prepareHeartbeatResponse(Date_t now,
const ReplSetHeartbeatArgs& args,
const std::string& ourSetName,
- ReplSetHeartbeatResponse* response) = 0;
-
- // produce a reply to a V1 heartbeat
+ ReplSetHeartbeatResponse* response);
virtual Status prepareHeartbeatResponseV1(Date_t now,
const ReplSetHeartbeatArgsV1& args,
const std::string& ourSetName,
- ReplSetHeartbeatResponse* response) = 0;
-
- struct ReplSetStatusArgs {
- Date_t now;
- unsigned selfUptime;
- const OpTime& readConcernMajorityOpTime;
- const BSONObj& initialSyncStatus;
- };
-
- // produce a reply to a status request
+ ReplSetHeartbeatResponse* response);
virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
BSONObjBuilder* response,
- Status* result) = 0;
-
- // Produce a replSetUpdatePosition command to be sent to the node's sync source.
+ Status* result);
virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand(
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle,
- OpTime currentCommittedSnapshotOpTime) const = 0;
-
- // produce a reply to an ismaster request. It is only valid to call this if we are a
- // replset.
- virtual void fillIsMasterForReplSet(IsMasterResponse* response) = 0;
-
- // Produce member data for the serverStatus command and diagnostic logging.
- virtual void fillMemberData(BSONObjBuilder* result) = 0;
-
- enum class PrepareFreezeResponseResult { kNoAction, kElectSelf };
-
- /**
- * Produce a reply to a freeze request. Returns a PostMemberStateUpdateAction on success that
- * may trigger state changes in the caller.
- */
- virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(
- Date_t now, int secs, BSONObjBuilder* response) = 0;
-
- ////////////////////////////////////////////////////////////
- //
- // Methods for sending and receiving heartbeats,
- // reconfiguring and handling the results of standing for
- // election.
- //
- ////////////////////////////////////////////////////////////
-
- /**
- * Updates the topology coordinator's notion of the replica set configuration.
- *
- * "newConfig" is the new configuration, and "selfIndex" is the index of this
- * node's configuration information in "newConfig", or "selfIndex" is -1 to
- * indicate that this node is not a member of "newConfig".
- *
- * newConfig.isInitialized() should be true, though implementations may accept
- * configurations where this is not true, for testing purposes.
- */
- virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now) = 0;
-
- /**
- * Prepares a heartbeat request appropriate for sending to "target", assuming the
- * current time is "now". "ourSetName" is used as the name for our replica set if
- * the topology coordinator does not have a valid configuration installed.
- *
- * The returned pair contains proper arguments for a replSetHeartbeat command, and
- * an amount of time to wait for the response.
- *
- * This call should be paired (with intervening network communication) with a call to
- * processHeartbeatResponse for the same "target".
- */
+ OpTime currentCommittedSnapshotOpTime) const;
+
+ virtual void fillIsMasterForReplSet(IsMasterResponse* response);
+ virtual void fillMemberData(BSONObjBuilder* result);
+ virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now,
+ int secs,
+ BSONObjBuilder* response);
+ virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now);
virtual std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest(
- Date_t now, const std::string& ourSetName, const HostAndPort& target) = 0;
+ Date_t now, const std::string& ourSetName, const HostAndPort& target);
virtual std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1(
- Date_t now, const std::string& ourSetName, const HostAndPort& target) = 0;
-
- /**
- * Processes a heartbeat response from "target" that arrived around "now", having
- * spent "networkRoundTripTime" millis on the network.
- *
- * Updates internal topology coordinator state, and returns instructions about what action
- * to take next.
- *
- * If the next action indicates StartElection, the topology coordinator has transitioned to
- * the "candidate" role, and will remain there until processWinElection or
- * processLoseElection are called.
- *
- * If the next action indicates "StepDownSelf", the topology coordinator has transitioned
- * to the "follower" role from "leader", and the caller should take any necessary actions
- * to become a follower.
- *
- * If the next action indicates "StepDownRemotePrimary", the caller should take steps to
- * cause the specified remote host to step down from primary to secondary.
- *
- * If the next action indicates "Reconfig", the caller should verify the configuration in
- * hbResponse is acceptable, perform any other reconfiguration actions it must, and call
- * updateConfig with the new configuration and the appropriate value for "selfIndex". It
- * must also wrap up any outstanding elections (by calling processLoseElection or
- * processWinElection) before calling updateConfig.
- *
- * This call should be paired (with intervening network communication) with a call to
- * prepareHeartbeatRequest for the same "target".
- */
+ Date_t now, const std::string& ourSetName, const HostAndPort& target);
virtual HeartbeatResponseAction processHeartbeatResponse(
Date_t now,
Milliseconds networkRoundTripTime,
const HostAndPort& target,
- const StatusWith<ReplSetHeartbeatResponse>& hbResponse) = 0;
-
- /**
- * Returns whether or not at least 'numNodes' have reached the given opTime.
- * "durablyWritten" indicates whether the operation has to be durably applied.
- */
- virtual bool haveNumNodesReachedOpTime(const OpTime& opTime,
- int numNodes,
- bool durablyWritten) = 0;
-
- /**
- * Returns whether or not at least one node matching the tagPattern has reached
- * the given opTime.
- * "durablyWritten" indicates whether the operation has to be durably applied.
- */
+ const StatusWith<ReplSetHeartbeatResponse>& hbResponse);
+ virtual bool voteForMyself(Date_t now);
+ virtual void setElectionInfo(OID electionId, Timestamp electionOpTime);
+ virtual void processWinElection(OID electionId, Timestamp electionOpTime);
+ virtual void processLoseElection();
+ virtual Status checkShouldStandForElection(Date_t now) const;
+ virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message);
+ virtual bool attemptStepDown(long long termAtStart,
+ Date_t now,
+ Date_t waitUntil,
+ Date_t stepDownUntil,
+ bool force) override;
+ virtual bool isSafeToStepDown() override;
+ virtual bool prepareForUnconditionalStepDown() override;
+ virtual Status prepareForStepDownAttempt() override;
+ virtual void abortAttemptedStepDownIfNeeded() override;
+ virtual void finishUnconditionalStepDown() override;
+ virtual Date_t getStepDownTime() const;
+ virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const;
+ virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const;
+ virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
+ ReplSetRequestVotesResponse* response);
+ virtual void summarizeAsHtml(ReplSetHtmlSummary* output);
+ virtual void loadLastVote(const LastVote& lastVote);
+ virtual void voteForMyselfV1();
+ virtual void setPrimaryIndex(long long primaryIndex);
+ virtual int getCurrentPrimaryIndex() const;
+ virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten);
virtual bool haveTaggedNodesReachedOpTime(const OpTime& opTime,
const ReplSetTagPattern& tagPattern,
- bool durablyWritten) = 0;
-
- /**
- * Returns a vector of members that have applied the operation with OpTime 'op'.
- * "durablyWritten" indicates whether the operation has to be durably applied.
- * "skipSelf" means to exclude this node whether or not the op has been applied.
- */
+ bool durablyWritten);
virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op,
bool durablyWritten,
- bool skipSelf) = 0;
+ bool skipSelf);
+ virtual bool setMemberAsDown(Date_t now, const int memberIndex) override;
+ virtual std::pair<int, Date_t> getStalestLiveMember() const;
+ virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now);
+ virtual void resetAllMemberTimeouts(Date_t now);
+ virtual void resetMemberTimeouts(Date_t now,
+ const stdx::unordered_set<HostAndPort>& member_set);
+ virtual OpTime getMyLastAppliedOpTime() const;
+ virtual OpTime getMyLastDurableOpTime() const;
+ virtual MemberData* getMyMemberData();
+ virtual MemberData* findMemberDataByMemberId(const int memberId);
+ virtual MemberData* findMemberDataByRid(const OID rid);
+ virtual MemberData* addSlaveMemberData(const OID rid);
+ virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason);
+ virtual void setStorageEngineSupportsReadCommitted(bool supported);
- /**
- * Marks a member as down from our perspective and returns a bool which indicates if we can no
- * longer see a majority of the nodes and thus should step down.
- */
- virtual bool setMemberAsDown(Date_t now, const int memberIndex) = 0;
+ virtual void restartHeartbeats();
- /**
- * Goes through the memberData and determines which member that is currently live
- * has the stalest (earliest) last update time. Returns (-1, Date_t::max()) if there are
- * no other members.
- */
- virtual std::pair<int, Date_t> getStalestLiveMember() const = 0;
+ virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const;
- /**
- * Go through the memberData, and mark nodes which haven't been updated
- * recently (within an election timeout) as "down". Returns a HeartbeatResponseAction, which
- * will be StepDownSelf if we can no longer see a majority of the nodes, otherwise NoAction.
- */
- virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now) = 0;
+ ////////////////////////////////////////////////////////////
+ //
+ // Test support methods
+ //
+ ////////////////////////////////////////////////////////////
- /**
- * Set all nodes in memberData to not stale with a lastUpdate of "now".
- */
- virtual void resetAllMemberTimeouts(Date_t now) = 0;
+ // Changes _memberState to newMemberState. Only for testing.
+ void changeMemberState_forTest(const MemberState& newMemberState,
+ const Timestamp& electionTime = Timestamp(0, 0));
- /**
- * Set all nodes in memberData that are present in member_set
- * to not stale with a lastUpdate of "now".
- */
- virtual void resetMemberTimeouts(Date_t now,
- const stdx::unordered_set<HostAndPort>& member_set) = 0;
+ // Sets "_electionTime" to "newElectionTime". Only for testing.
+ void _setElectionTime(const Timestamp& newElectionTime);
- /*
- * Returns the last optime that this node has applied, whether or not it has been journaled.
- */
- virtual OpTime getMyLastAppliedOpTime() const = 0;
+ // Sets _currentPrimaryIndex to the given index. Should only be used in unit tests!
+ // TODO(spencer): Remove this once we can easily call for an election in unit tests to
+ // set the current primary.
+ void _setCurrentPrimaryForTest(int primaryIndex);
- /*
- * Returns the last optime that this node has applied and journaled.
- */
- virtual OpTime getMyLastDurableOpTime() const = 0;
+ // Returns _electionTime. Only used in unittests.
+ Timestamp getElectionTime() const;
- /*
- * Returns information we have on the state of this node.
- */
- virtual MemberData* getMyMemberData() = 0;
+ // Returns _electionId. Only used in unittests.
+ OID getElectionId() const;
- /*
- * Returns information we have on the state of the node identified by memberId. Returns
- * nullptr if memberId is not found in the configuration.
- */
- virtual MemberData* findMemberDataByMemberId(const int memberId) = 0;
+private:
+ enum UnelectableReason {
+ None = 0,
+ CannotSeeMajority = 1 << 0,
+ NotCloseEnoughToLatestOptime = 1 << 1,
+ ArbiterIAm = 1 << 2,
+ NotSecondary = 1 << 3,
+ NoPriority = 1 << 4,
+ StepDownPeriodActive = 1 << 5,
+ NoData = 1 << 6,
+ NotInitialized = 1 << 7,
+ VotedTooRecently = 1 << 8,
+ RefusesToStand = 1 << 9,
+ NotCloseEnoughToLatestForPriorityTakeover = 1 << 10,
+ NotFreshEnoughForCatchupTakeover = 1 << 11,
+ };
+ typedef int UnelectableReasonMask;
- /*
- * Returns information we have on the state of the node identified by rid. Returns
- * nullptr if rid is not found in the heartbeat data. This method is used only for
- * master/slave replication.
- */
- virtual MemberData* findMemberDataByRid(const OID rid) = 0;
- /*
- * Adds and returns a memberData entry for the given RID.
- * Used only in master/slave mode.
- */
- virtual MemberData* addSlaveMemberData(const OID rid) = 0;
+ // Set what type of PRIMARY this node currently is.
+ void _setLeaderMode(LeaderMode mode);
- /**
- * If getRole() == Role::candidate and this node has not voted too recently, updates the
- * lastVote tracker and returns true. Otherwise, returns false.
- */
- virtual bool voteForMyself(Date_t now) = 0;
+ // Returns the number of heartbeat pings which have occurred.
+ int _getTotalPings();
- /**
- * Sets lastVote to be for ourself in this term.
- */
- virtual void voteForMyselfV1() = 0;
+ // Returns the current "ping" value for the given member by their address
+ Milliseconds _getPing(const HostAndPort& host);
- /**
- * Sets election id and election optime.
- */
- virtual void setElectionInfo(OID electionId, Timestamp electionOpTime) = 0;
+ // Determines if we will veto the member specified by "args.id".
+ // If we veto, the errmsg will be filled in with a reason
+ bool _shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const Date_t& now,
+ std::string* errmsg) const;
- /**
- * Performs state updates associated with winning an election.
- *
- * It is an error to call this if the topology coordinator is not in candidate mode.
- *
- * Exactly one of either processWinElection or processLoseElection must be called if
- * processHeartbeatResponse returns StartElection, to exit candidate mode.
- */
- virtual void processWinElection(OID electionId, Timestamp electionOpTime) = 0;
+ // Returns the index of the member with the matching id, or -1 if none match.
+ int _getMemberIndex(int id) const;
- /**
- * Performs state updates associated with losing an election.
- *
- * It is an error to call this if the topology coordinator is not in candidate mode.
- *
- * Exactly one of either processWinElection or processLoseElection must be called if
- * processHeartbeatResponse returns StartElection, to exit candidate mode.
- */
- virtual void processLoseElection() = 0;
+ // Sees if a majority number of votes are held by members who are currently "up"
+ bool _aMajoritySeemsToBeUp() const;
- /**
- * Readies the TopologyCoordinator for an attempt to stepdown that may fail. This is used
- * when we receive a stepdown command (which can fail if not enough secondaries are caught up)
- * to ensure that we never process more than one stepdown request at a time.
- * Returns OK if it is safe to continue with the stepdown attempt, or returns
- * ConflictingOperationInProgess if this node is already processing a stepdown request of any
- * kind.
- */
- virtual Status prepareForStepDownAttempt() = 0;
+ // Checks if the node can see a healthy primary of equal or greater priority to the
+ // candidate. If so, returns the index of that node. Otherwise returns -1.
+ int _findHealthyPrimaryOfEqualOrGreaterPriority(const int candidateIndex) const;
- /**
- * If this node is still attempting to process a stepdown attempt, aborts the attempt and
- * returns this node to normal primary/master state. If this node has already completed
- * stepping down or is now in the process of handling an unconditional stepdown, then this
- * method does nothing.
- */
- virtual void abortAttemptedStepDownIfNeeded() = 0;
+ // Is otherOpTime close enough (within 10 seconds) to the latest known optime to qualify
+ // for an election
+ bool _isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const;
- /**
- * Tries to transition the coordinator from the leader role to the follower role.
- *
- * A step down succeeds based on the following conditions:
- *
- * C1. 'force' is true and now > waitUntil
- *
- * C2. A majority set of nodes, M, in the replica set have optimes greater than or
- * equal to the last applied optime of the primary.
- *
- * C3. There exists at least one electable secondary node in the majority set M.
- *
- *
- * If C1 is true, or if both C2 and C3 are true, then the stepdown occurs and this method
- * returns true. If the conditions for successful stepdown aren't met yet, but waiting for more
- * time to pass could make it succeed, returns false. If the whole stepdown attempt should be
- * abandoned (for example because the time limit expired or because we've already stepped down),
- * throws an exception.
- * TODO(spencer): Unify with the finishUnconditionalStepDown() method.
- */
- virtual bool attemptStepDown(
- long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) = 0;
+ // Is our optime close enough to the latest known optime to call for a priority takeover.
+ bool _amIFreshEnoughForPriorityTakeover() const;
- /**
- * Returns whether it is safe for a stepdown attempt to complete, ignoring the 'force' argument.
- * This is essentially checking conditions C2 and C3 as described in the comment to
- * attemptStepDown().
- */
- virtual bool isSafeToStepDown() = 0;
+ // Is the primary node still in catchup mode and is our optime the latest
+ // known optime of all the up nodes.
+ bool _amIFreshEnoughForCatchupTakeover() const;
- /**
- * Readies the TopologyCoordinator for stepdown. Returns false if we're already in the process
- * of an unconditional step down. If we are in the middle of a stepdown command attempt when
- * this is called then this unconditional stepdown will supersede the stepdown attempt, which
- * will cause the stepdown to fail. When this returns true it must be followed by a call to
- * finishUnconditionalStepDown() that is called when holding the global X lock.
- */
- virtual bool prepareForUnconditionalStepDown() = 0;
+ // Returns reason why "self" member is unelectable
+ UnelectableReasonMask _getMyUnelectableReason(const Date_t now,
+ StartElectionReason reason) const;
- /**
- * Sometimes a request to step down comes in (like via a heartbeat), but we don't have the
- * global exclusive lock so we can't actually stepdown at that moment. When that happens
- * we record that a stepdown request is pending (by calling prepareForUnconditionalStepDown())
- * and schedule work to stepdown in the global X lock. This method is called after holding the
- * global lock to perform the actual stepdown.
- * TODO(spencer): Unify with the finishAttemptedStepDown() method.
- */
- virtual void finishUnconditionalStepDown() = 0;
+ // Returns reason why memberIndex is unelectable
+ UnelectableReasonMask _getUnelectableReason(int memberIndex) const;
- /**
- * Considers whether or not this node should stand for election, and returns true
- * if the node has transitioned to candidate role as a result of the call.
- */
- virtual Status checkShouldStandForElection(Date_t now) const = 0;
+ // Returns the nice text of why the node is unelectable
+ std::string _getUnelectableReasonString(UnelectableReasonMask ur) const;
- /**
- * Set the outgoing heartbeat message from self
- */
- virtual void setMyHeartbeatMessage(const Date_t now, const std::string& s) = 0;
+ // Return true if we are currently primary
+ bool _iAmPrimary() const;
- /**
- * Prepares a ReplSetMetadata object describing the current term, primary, and lastOp
- * information.
- */
- virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const = 0;
+ // Scans through all members that are 'up' and return the latest known optime.
+ OpTime _latestKnownOpTime() const;
- /**
- * Prepares an OplogQueryMetadata object describing the current sync source, rbid, primary,
- * lastOpApplied, and lastOpCommitted.
- */
- virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const = 0;
+ // Scans the electable set and returns the highest priority member index
+ int _getHighestPriorityElectableIndex(Date_t now) const;
- /**
- * Writes into 'output' all the information needed to generate a summary of the current
- * replication state for use by the web interface.
- */
- virtual void summarizeAsHtml(ReplSetHtmlSummary* output) = 0;
+ // Returns true if "one" member is higher priority than "two" member
+ bool _isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const;
- /**
- * Prepares a ReplSetRequestVotesResponse.
- */
- virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response) = 0;
+ // Helper shortcut to self config
+ const MemberConfig& _selfConfig() const;
- /**
- * Loads an initial LastVote document, which was read from local storage.
- *
- * Called only during replication startup. All other updates are done internally.
- */
- virtual void loadLastVote(const LastVote& lastVote) = 0;
+ // Helper shortcut to self member data
+ const MemberData& _selfMemberData() const;
- /**
- * Updates the current primary index.
- */
- virtual void setPrimaryIndex(long long primaryIndex) = 0;
+ // Index of self member in member data.
+ const int _selfMemberDataIndex() const;
- /**
- * Returns the current primary index.
- */
- virtual int getCurrentPrimaryIndex() const = 0;
-
- enum StartElectionReason {
- kElectionTimeout,
- kPriorityTakeover,
- kStepUpRequest,
- kCatchupTakeover
- };
+ // Returns NULL if there is no primary, or the MemberConfig* for the current primary
+ const MemberConfig* _currentPrimaryMember() const;
/**
- * Transitions to the candidate role if the node is electable.
+ * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an
+ * election or stepdown should commence.
+ * _updatePrimaryFromHBDataV1() is a simplified version of _updatePrimaryFromHBData() to be used
+ * when in ProtocolVersion1.
*/
- virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason) = 0;
+ HeartbeatResponseAction _updatePrimaryFromHBData(int updatedConfigIndex,
+ const MemberState& originalState,
+ Date_t now);
+ HeartbeatResponseAction _updatePrimaryFromHBDataV1(int updatedConfigIndex,
+ const MemberState& originalState,
+ Date_t now);
/**
- * Updates the storage engine read committed support in the TopologyCoordinator options after
- * creation.
+ * Updates _memberData based on the newConfig, ensuring that every member in the newConfig
+ * has an entry in _memberData. If any nodes in the newConfig are also present in
+ * _currentConfig, copies their heartbeat info into the corresponding entry in the updated
+ * _memberData vector.
*/
- virtual void setStorageEngineSupportsReadCommitted(bool supported) = 0;
+ void _updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now);
/**
- * Reset the booleans to record the last heartbeat restart.
+ * Returns whether a stepdown attempt should be allowed to proceed. See the comment for
+ * attemptStepDown() for more details on the rules of when stepdown attempts succeed or fail.
*/
- virtual void restartHeartbeats() = 0;
+ bool _canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force);
- /**
- * Scans through all members that are 'up' and return the latest known optime, if we have
- * received (successful or failed) heartbeats from all nodes since heartbeat restart.
- *
- * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted
- * heartbeats.
- * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down.
- */
- virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const = 0;
+ void _stepDownSelfAndReplaceWith(int newPrimary);
-protected:
- TopologyCoordinator() {}
-};
-
-/**
- * Type that denotes the role of a node in the replication protocol.
- *
- * The role is distinct from MemberState, in that it only deals with the
- * roles a node plays in the basic protocol -- leader, follower and candidate.
- * The mapping between MemberState and Role is complex -- several MemberStates
- * map to the follower role, and MemberState::RS_SECONDARY maps to either
- * follower or candidate roles, e.g.
- */
-class TopologyCoordinator::Role {
-public:
/**
- * Constant indicating leader role.
- */
- static const Role leader;
+ * Looks up the provided member in the blacklist and returns true if the member's blacklist
+ * expire time is after 'now'. If the member is found but the expire time is before 'now',
+ * the function returns false. If the member is not found in the blacklist, the function
+ * returns false.
+ **/
+ bool _memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const;
/**
- * Constant indicating follower role.
- */
- static const Role follower;
-
- /**
- * Constant indicating candidate role
- */
- static const Role candidate;
-
- Role() {}
-
- bool operator==(Role other) const {
- return _value == other._value;
- }
- bool operator!=(Role other) const {
- return _value != other._value;
- }
-
- std::string toString() const;
-
-private:
- explicit Role(int value);
+ * Returns true if we are a one-node replica set, we're the one member,
+ * we're electable, we're not in maintenance mode, and we are currently in followerMode
+ * SECONDARY.
+ *
+ * This is used to decide if we should transition to Role::candidate in a one-node replica set.
+ */
+ bool _isElectableNodeInSingleNodeReplicaSet() const;
+
+ // This node's role in the replication protocol.
+ Role _role;
+
+ // This is a unique id that is generated and set each time we transition to PRIMARY, as the
+ // result of an election.
+ OID _electionId;
+ // The time at which the current PRIMARY was elected.
+ Timestamp _electionTime;
+
+ // This node's election term. The term is used as part of the consensus algorithm to elect
+ // and maintain one primary (leader) node in the cluster.
+ long long _term;
+
+ // the index of the member we currently believe is primary, if one exists, otherwise -1
+ int _currentPrimaryIndex;
+
+ // the hostandport we are currently syncing from
+ // empty if no sync source (we are primary, or we cannot connect to anyone yet)
+ HostAndPort _syncSource;
+ // These members are not chosen as sync sources for a period of time, due to connection
+ // issues with them
+ std::map<HostAndPort, Date_t> _syncSourceBlacklist;
+ // The next sync source to be chosen, requested via a replSetSyncFrom command
+ int _forceSyncSourceIndex;
+
+ // Options for this TopologyCoordinator
+ Options _options;
+
+ // "heartbeat message"
+ // sent in requestHeartbeat respond in field "hbm"
+ std::string _hbmsg;
+ Date_t _hbmsgTime; // when it was logged
+
+ // heartbeat msg to send to others; descriptive diagnostic info
+ std::string _getHbmsg(Date_t now) const;
+
+ int _selfIndex; // this node's index in _members and _currentConfig
+
+ ReplSetConfig _rsConfig; // The current config, including a vector of MemberConfigs
+
+ // Heartbeat, current applied/durable optime, and other state data for each member. It is
+ // guaranteed that this vector will be maintained in the same order as the MemberConfigs in
+ // _currentConfig, therefore the member config index can be used to index into this vector as
+ // well.
+ std::vector<MemberData> _memberData;
+
+ // Time when stepDown command expires
+ Date_t _stepDownUntil;
+
+ // A time before which this node will not stand for election.
+ // In protocol version 1, this is used to prevent running for election after seeing
+ // a new term.
+ Date_t _electionSleepUntil;
+
+ // OpTime of the latest committed operation.
+ OpTime _lastCommittedOpTime;
+
+ // OpTime representing our transition to PRIMARY and the start of our term.
+ // _lastCommittedOpTime cannot be set to an earlier OpTime.
+ OpTime _firstOpTimeOfMyTerm;
+
+ // The number of calls we have had to enter maintenance mode
+ int _maintenanceModeCalls;
+
+ // The sub-mode of follower that we are in. Legal values are RS_SECONDARY, RS_RECOVERING,
+ // RS_STARTUP2 (initial sync) and RS_ROLLBACK. Only meaningful if _role == Role::follower.
+ // Configured via setFollowerMode(). If the sub-mode is RS_SECONDARY, then the effective
+ // sub-mode is either RS_SECONDARY or RS_RECOVERING, depending on _maintenanceModeCalls.
+ // Rather than accesing this variable direclty, one should use the getMemberState() method,
+ // which computes the replica set node state on the fly.
+ MemberState::MS _followerMode;
+
+ // What type of PRIMARY this node currently is. Don't set this directly, call _setLeaderMode
+ // instead.
+ LeaderMode _leaderMode = LeaderMode::kNotLeader;
+
+ typedef std::map<HostAndPort, PingStats> PingMap;
+ // Ping stats for each member by HostAndPort;
+ PingMap _pings;
+
+ // Last vote info from the election
+ struct VoteLease {
+ static const Seconds leaseTime;
+
+ Date_t when;
+ int whoId = -1;
+ HostAndPort whoHostAndPort;
+ } _voteLease;
+
+ // V1 last vote info for elections
+ LastVote _lastVote{OpTime::kInitialTerm, -1};
+
+ enum class ReadCommittedSupport {
+ kUnknown,
+ kNo,
+ kYes,
+ };
- int _value;
+ // Whether or not the storage engine supports read committed.
+ ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown};
};
-//
-// Convenience method for unittest code. Please use accessors otherwise.
-//
-
-std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role);
-std::ostream& operator<<(std::ostream& os, TopologyCoordinator::PrepareFreezeResponseResult result);
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
deleted file mode 100644
index 84f4ccbedfe..00000000000
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ /dev/null
@@ -1,3239 +0,0 @@
-/**
- * Copyright 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/topology_coordinator_impl.h"
-
-#include <limits>
-
-#include "mongo/db/audit.h"
-#include "mongo/db/client.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/heartbeat_response_action.h"
-#include "mongo/db/repl/is_master_response.h"
-#include "mongo/db/repl/isself.h"
-#include "mongo/db/repl/repl_set_heartbeat_args.h"
-#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
-#include "mongo/db/repl/repl_set_heartbeat_response.h"
-#include "mongo/db/repl/repl_set_html_summary.h"
-#include "mongo/db/repl/repl_set_request_votes_args.h"
-#include "mongo/db/repl/rslog.h"
-#include "mongo/db/repl/update_position_args.h"
-#include "mongo/db/server_parameters.h"
-#include "mongo/rpc/metadata/oplog_query_metadata.h"
-#include "mongo/rpc/metadata/repl_set_metadata.h"
-#include "mongo/util/fail_point_service.h"
-#include "mongo/util/hex.h"
-#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-namespace repl {
-using std::vector;
-const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30);
-
-// Controls how caught up in replication a secondary with higher priority than the current primary
-// must be before it will call for a priority takeover election.
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(priorityTakeoverFreshnessWindowSeconds, int, 2);
-
-// If this fail point is enabled, TopologyCoordinatorImpl::shouldChangeSyncSource() will ignore
-// the option TopologyCoordinatorImpl::Options::maxSyncSourceLagSecs. The sync source will not be
-// re-evaluated if it lags behind another node by more than 'maxSyncSourceLagSecs' seconds.
-MONGO_FP_DECLARE(disableMaxSyncSourceLagSecs);
-
-namespace {
-
-template <typename T>
-int indexOfIterator(const std::vector<T>& vec, typename std::vector<T>::const_iterator& it) {
- return static_cast<int>(it - vec.begin());
-}
-
-// Maximum number of retries for a failed heartbeat.
-const int kMaxHeartbeatRetries = 2;
-
-/**
- * Returns true if the only up heartbeats are auth errors.
- */
-bool _hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberData>& hbdata, const int selfIndex) {
- bool foundAuthError = false;
- for (std::vector<MemberData>::const_iterator it = hbdata.begin(); it != hbdata.end(); ++it) {
- if (indexOfIterator(hbdata, it) == selfIndex) {
- continue;
- }
-
- if (it->up()) {
- return false;
- }
-
- if (it->hasAuthIssue()) {
- foundAuthError = true;
- }
- }
-
- return foundAuthError;
-}
-
-void appendOpTime(BSONObjBuilder* bob,
- const char* elemName,
- const OpTime& opTime,
- const long long pv) {
- if (pv == 1) {
- opTime.append(bob, elemName);
- } else {
- bob->append(elemName, opTime.getTimestamp());
- }
-}
-} // namespace
-
-void PingStats::start(Date_t now) {
- _lastHeartbeatStartDate = now;
- _numFailuresSinceLastStart = 0;
-}
-
-void PingStats::hit(Milliseconds millis) {
- _numFailuresSinceLastStart = std::numeric_limits<int>::max();
- ++count;
-
- value = value == UninitializedPing ? millis : Milliseconds((value * 4 + millis) / 5);
-}
-
-void PingStats::miss() {
- ++_numFailuresSinceLastStart;
-}
-
-TopologyCoordinatorImpl::TopologyCoordinatorImpl(Options options)
- : _role(Role::follower),
- _term(OpTime::kUninitializedTerm),
- _currentPrimaryIndex(-1),
- _forceSyncSourceIndex(-1),
- _options(std::move(options)),
- _selfIndex(-1),
- _maintenanceModeCalls(0),
- _followerMode(MemberState::RS_STARTUP2) {
- invariant(getMemberState() == MemberState::RS_STARTUP);
- // Need an entry for self in the memberHearbeatData.
- _memberData.emplace_back();
- _memberData.back().setIsSelf(true);
-}
-
-TopologyCoordinator::Role TopologyCoordinatorImpl::getRole() const {
- return _role;
-}
-
-void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) {
- invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
- _forceSyncSourceIndex = index;
-}
-
-HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
- return _syncSource;
-}
-
-HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
- const OpTime& lastOpTimeFetched,
- ChainingPreference chainingPreference) {
- // If we are not a member of the current replica set configuration, no sync source is valid.
- if (_selfIndex == -1) {
- LOG(1) << "Cannot sync from any members because we are not in the replica set config";
- return HostAndPort();
- }
-
- // if we have a target we've requested to sync from, use it
- if (_forceSyncSourceIndex != -1) {
- invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
- _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
- _forceSyncSourceIndex = -1;
- log() << "choosing sync source candidate by request: " << _syncSource;
- std::string msg(str::stream() << "syncing from: " << _syncSource.toString()
- << " by request");
- setMyHeartbeatMessage(now, msg);
- return _syncSource;
- }
-
- // wait for 2N pings (not counting ourselves) before choosing a sync target
- int needMorePings = (_memberData.size() - 1) * 2 - _getTotalPings();
-
- if (needMorePings > 0) {
- OCCASIONALLY log() << "waiting for " << needMorePings
- << " pings from other members before syncing";
- _syncSource = HostAndPort();
- return _syncSource;
- }
-
- // If we are only allowed to sync from the primary, set that
- if (chainingPreference == ChainingPreference::kUseConfiguration &&
- !_rsConfig.isChainingAllowed()) {
- if (_currentPrimaryIndex == -1) {
- LOG(1) << "Cannot select a sync source because chaining is"
- " not allowed and primary is unknown/down";
- _syncSource = HostAndPort();
- return _syncSource;
- } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) {
- LOG(1) << "Cannot select a sync source because chaining is not allowed and primary "
- "member is blacklisted: "
- << _currentPrimaryMember()->getHostAndPort();
- _syncSource = HostAndPort();
- return _syncSource;
- } else if (_currentPrimaryIndex == _selfIndex) {
- LOG(1)
- << "Cannot select a sync source because chaining is not allowed and we are primary";
- _syncSource = HostAndPort();
- return _syncSource;
- } else {
- _syncSource = _currentPrimaryMember()->getHostAndPort();
- log() << "chaining not allowed, choosing primary as sync source candidate: "
- << _syncSource;
- std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString());
- setMyHeartbeatMessage(now, msg);
- return _syncSource;
- }
- }
-
- // find the member with the lowest ping time that is ahead of me
-
- // choose a time that will exclude no candidates by default, in case we don't see a primary
- OpTime oldestSyncOpTime;
-
- // Find primary's oplog time. Reject sync candidates that are more than
- // _options.maxSyncSourceLagSecs seconds behind.
- if (_currentPrimaryIndex != -1) {
- OpTime primaryOpTime = _memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime();
-
- // Check if primaryOpTime is still close to 0 because we haven't received
- // our first heartbeat from a new primary yet.
- unsigned int maxLag =
- static_cast<unsigned int>(durationCount<Seconds>(_options.maxSyncSourceLagSecs));
- if (primaryOpTime.getSecs() >= maxLag) {
- oldestSyncOpTime =
- OpTime(Timestamp(primaryOpTime.getSecs() - maxLag, 0), primaryOpTime.getTerm());
- }
- }
-
- int closestIndex = -1;
-
- // Make two attempts, with less restrictive rules the second time.
- //
- // During the first attempt, we ignore those nodes that have a larger slave
- // delay, hidden nodes or non-voting, and nodes that are excessively behind.
- //
- // For the second attempt include those nodes, in case those are the only ones we can reach.
- //
- // This loop attempts to set 'closestIndex', to select a viable candidate.
- for (int attempts = 0; attempts < 2; ++attempts) {
- for (std::vector<MemberData>::const_iterator it = _memberData.begin();
- it != _memberData.end();
- ++it) {
- const int itIndex = indexOfIterator(_memberData, it);
- // Don't consider ourselves.
- if (itIndex == _selfIndex) {
- continue;
- }
-
- const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex));
-
- // Candidate must be up to be considered.
- if (!it->up()) {
- LOG(2) << "Cannot select sync source because it is not up: "
- << itMemberConfig.getHostAndPort();
- continue;
- }
- // Candidate must be PRIMARY or SECONDARY state to be considered.
- if (!it->getState().readable()) {
- LOG(2) << "Cannot select sync source because it is not readable: "
- << itMemberConfig.getHostAndPort();
- continue;
- }
-
- // On the first attempt, we skip candidates that do not match these criteria.
- if (attempts == 0) {
- // Candidate must be a voter if we are a voter.
- if (_selfConfig().isVoter() && !itMemberConfig.isVoter()) {
- LOG(2) << "Cannot select sync source because we are a voter and it is not: "
- << itMemberConfig.getHostAndPort();
- continue;
- }
- // Candidates must not be hidden.
- if (itMemberConfig.isHidden()) {
- LOG(2) << "Cannot select sync source because it is hidden: "
- << itMemberConfig.getHostAndPort();
- continue;
- }
- // Candidates cannot be excessively behind.
- if (it->getHeartbeatAppliedOpTime() < oldestSyncOpTime) {
- LOG(2) << "Cannot select sync source because it is too far behind."
- << "Latest optime of sync candidate " << itMemberConfig.getHostAndPort()
- << ": " << it->getHeartbeatAppliedOpTime()
- << ", oldest acceptable optime: " << oldestSyncOpTime;
- continue;
- }
- // Candidate must not have a configured delay larger than ours.
- if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay()) {
- LOG(2) << "Cannot select sync source with larger slaveDelay than ours: "
- << itMemberConfig.getHostAndPort();
- continue;
- }
- }
- // Candidate must build indexes if we build indexes, to be considered.
- if (_selfConfig().shouldBuildIndexes()) {
- if (!itMemberConfig.shouldBuildIndexes()) {
- LOG(2) << "Cannot select sync source with shouldBuildIndex differences: "
- << itMemberConfig.getHostAndPort();
- continue;
- }
- }
- // only consider candidates that are ahead of where we are
- if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) {
- LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. "
- << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON()
- << ", latest oplog optime of sync candidate "
- << itMemberConfig.getHostAndPort() << ": "
- << it->getHeartbeatAppliedOpTime().toBSON();
- continue;
- }
- // Candidate cannot be more latent than anything we've already considered.
- if ((closestIndex != -1) &&
- (_getPing(itMemberConfig.getHostAndPort()) >
- _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) {
- LOG(2) << "Cannot select sync source with higher latency than the best candidate: "
- << itMemberConfig.getHostAndPort();
-
- continue;
- }
- // Candidate cannot be blacklisted.
- if (_memberIsBlacklisted(itMemberConfig, now)) {
- LOG(1) << "Cannot select sync source which is blacklisted: "
- << itMemberConfig.getHostAndPort();
-
- continue;
- }
- // This candidate has passed all tests; set 'closestIndex'
- closestIndex = itIndex;
- }
- if (closestIndex != -1)
- break; // no need for second attempt
- }
-
- if (closestIndex == -1) {
- // Did not find any members to sync from
- std::string msg("could not find member to sync from");
- // Only log when we had a valid sync source before
- if (!_syncSource.empty()) {
- log() << msg << rsLog;
- }
- setMyHeartbeatMessage(now, msg);
-
- _syncSource = HostAndPort();
- return _syncSource;
- }
- _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort();
- log() << "sync source candidate: " << _syncSource;
- std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0);
- setMyHeartbeatMessage(now, msg);
- return _syncSource;
-}
-
-bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberConfig,
- Date_t now) const {
- std::map<HostAndPort, Date_t>::const_iterator blacklisted =
- _syncSourceBlacklist.find(memberConfig.getHostAndPort());
- if (blacklisted != _syncSourceBlacklist.end()) {
- if (blacklisted->second > now) {
- return true;
- }
- }
- return false;
-}
-
-void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
- LOG(2) << "blacklisting " << host << " until " << until.toString();
- _syncSourceBlacklist[host] = until;
-}
-
-void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Date_t now) {
- std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host);
- if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) {
- LOG(2) << "unblacklisting " << host;
- _syncSourceBlacklist.erase(hostItr);
- }
-}
-
-void TopologyCoordinatorImpl::clearSyncSourceBlacklist() {
- _syncSourceBlacklist.clear();
-}
-
-void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target,
- BSONObjBuilder* response,
- Status* result) {
- response->append("syncFromRequested", target.toString());
-
- if (_selfIndex == -1) {
- *result = Status(ErrorCodes::NotSecondary, "Removed and uninitialized nodes do not sync");
- return;
- }
-
- const MemberConfig& selfConfig = _selfConfig();
- if (selfConfig.isArbiter()) {
- *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync");
- return;
- }
- if (_selfIndex == _currentPrimaryIndex) {
- *result = Status(ErrorCodes::NotSecondary, "primaries don't sync");
- return;
- }
-
- ReplSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd();
- int targetIndex = 0;
- for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
- ++it) {
- if (it->getHostAndPort() == target) {
- targetConfig = it;
- break;
- }
- ++targetIndex;
- }
- if (targetConfig == _rsConfig.membersEnd()) {
- *result = Status(ErrorCodes::NodeNotFound,
- str::stream() << "Could not find member \"" << target.toString()
- << "\" in replica set");
- return;
- }
- if (targetIndex == _selfIndex) {
- *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself");
- return;
- }
- if (targetConfig->isArbiter()) {
- *result = Status(ErrorCodes::InvalidOptions,
- str::stream() << "Cannot sync from \"" << target.toString()
- << "\" because it is an arbiter");
- return;
- }
- if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) {
- *result = Status(ErrorCodes::InvalidOptions,
- str::stream() << "Cannot sync from \"" << target.toString()
- << "\" because it does not build indexes");
- return;
- }
-
- if (selfConfig.isVoter() && !targetConfig->isVoter()) {
- *result = Status(ErrorCodes::InvalidOptions,
- str::stream() << "Cannot sync from \"" << target.toString()
- << "\" because it is not a voter");
- return;
- }
-
- const MemberData& hbdata = _memberData.at(targetIndex);
- if (hbdata.hasAuthIssue()) {
- *result =
- Status(ErrorCodes::Unauthorized,
- str::stream() << "not authorized to communicate with " << target.toString());
- return;
- }
- if (hbdata.getHealth() == 0) {
- *result =
- Status(ErrorCodes::HostUnreachable,
- str::stream() << "I cannot reach the requested member: " << target.toString());
- return;
- }
- const OpTime lastOpApplied = getMyLastAppliedOpTime();
- if (hbdata.getHeartbeatAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) {
- warning() << "attempting to sync from " << target << ", but its latest opTime is "
- << hbdata.getHeartbeatAppliedOpTime().getSecs() << " and ours is "
- << lastOpApplied.getSecs() << " so this may not work";
- response->append("warning",
- str::stream() << "requested member \"" << target.toString()
- << "\" is more than 10 seconds behind us");
- // not returning bad Status, just warning
- }
-
- HostAndPort prevSyncSource = getSyncSourceAddress();
- if (!prevSyncSource.empty()) {
- response->append("prevSyncTarget", prevSyncSource.toString());
- }
-
- setForceSyncSourceIndex(targetIndex);
- *result = Status::OK();
-}
-
-void TopologyCoordinatorImpl::prepareFreshResponse(
- const ReplicationCoordinator::ReplSetFreshArgs& args,
- const Date_t now,
- BSONObjBuilder* response,
- Status* result) {
- if (_rsConfig.getProtocolVersion() != 0) {
- *result = Status(ErrorCodes::BadValue,
- str::stream() << "replset: incompatible replset protocol version: "
- << _rsConfig.getProtocolVersion());
- return;
- }
-
- if (_selfIndex == -1) {
- *result = Status(ErrorCodes::ReplicaSetNotFound,
- "Cannot participate in elections because not initialized");
- return;
- }
-
- if (args.setName != _rsConfig.getReplSetName()) {
- *result =
- Status(ErrorCodes::ReplicaSetNotFound,
- str::stream() << "Wrong repl set name. Expected: " << _rsConfig.getReplSetName()
- << ", received: "
- << args.setName);
- return;
- }
-
- if (args.id == static_cast<unsigned>(_selfConfig().getId())) {
- *result = Status(ErrorCodes::BadValue,
- str::stream() << "Received replSetFresh command from member with the "
- "same member ID as ourself: "
- << args.id);
- return;
- }
-
- bool weAreFresher = false;
- const OpTime lastOpApplied = getMyLastAppliedOpTime();
- if (_rsConfig.getConfigVersion() > args.cfgver) {
- log() << "replSet member " << args.who << " is not yet aware its cfg version "
- << args.cfgver << " is stale";
- response->append("info", "config version stale");
- weAreFresher = true;
- }
- // check not only our own optime, but any other member we can reach
- else if (OpTime(args.opTime, _term) < _latestKnownOpTime()) {
- weAreFresher = true;
- }
- response->appendDate("opTime",
- Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL()));
- response->append("fresher", weAreFresher);
-
- std::string errmsg;
- bool doVeto = _shouldVetoMember(args, now, &errmsg);
- response->append("veto", doVeto);
- if (doVeto) {
- response->append("errmsg", errmsg);
- }
- *result = Status::OK();
-}
-
-bool TopologyCoordinatorImpl::_shouldVetoMember(
- const ReplicationCoordinator::ReplSetFreshArgs& args,
- const Date_t& now,
- std::string* errmsg) const {
- if (_rsConfig.getConfigVersion() < args.cfgver) {
- // We are stale; do not veto.
- return false;
- }
-
- const unsigned int memberID = args.id;
- const int hopefulIndex = _getMemberIndex(memberID);
- invariant(hopefulIndex != _selfIndex);
- const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
-
- if (hopefulIndex == -1) {
- *errmsg = str::stream() << "replSet couldn't find member with id " << memberID;
- return true;
- }
- const OpTime lastOpApplied = getMyLastAppliedOpTime();
- if (_iAmPrimary() &&
- lastOpApplied >= _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime()) {
- // hbinfo is not updated for ourself, so if we are primary we have to check the
- // primary's last optime separately
- *errmsg = str::stream() << "I am already primary, "
- << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
- << " can try again once I've stepped down";
- return true;
- }
-
- if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) &&
- (_memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime() >=
- _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime())) {
- // other members might be aware of more up-to-date nodes
- *errmsg =
- str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
- << " is trying to elect itself but "
- << _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString()
- << " is already primary and more up-to-date";
- return true;
- }
-
- if ((highestPriorityIndex != -1)) {
- const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex);
- const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
-
- if (priorityMember.getPriority() > hopefulMember.getPriority()) {
- *errmsg = str::stream() << hopefulMember.getHostAndPort().toString()
- << " has lower priority of " << hopefulMember.getPriority()
- << " than " << priorityMember.getHostAndPort().toString()
- << " which has a priority of " << priorityMember.getPriority();
- return true;
- }
- }
-
- UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex);
- reason &= ~RefusesToStand;
- if (reason) {
- *errmsg = str::stream() << "I don't think "
- << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
- << " is electable because the "
- << _getUnelectableReasonString(reason);
- return true;
- }
-
- return false;
-}
-
-// produce a reply to a received electCmd
-void TopologyCoordinatorImpl::prepareElectResponse(
- const ReplicationCoordinator::ReplSetElectArgs& args,
- const Date_t now,
- BSONObjBuilder* response,
- Status* result) {
- if (_rsConfig.getProtocolVersion() != 0) {
- *result = Status(ErrorCodes::BadValue,
- str::stream() << "replset: incompatible replset protocol version: "
- << _rsConfig.getProtocolVersion());
- return;
- }
- if (_selfIndex == -1) {
- *result = Status(ErrorCodes::ReplicaSetNotFound,
- "Cannot participate in election because not initialized");
- return;
- }
-
- const long long myver = _rsConfig.getConfigVersion();
- const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
-
- const MemberConfig* primary = _currentPrimaryMember();
- const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid);
- const MemberConfig* highestPriority =
- highestPriorityIndex == -1 ? NULL : &_rsConfig.getMemberAt(highestPriorityIndex);
-
- int vote = 0;
- if (args.set != _rsConfig.getReplSetName()) {
- log() << "replSet error received an elect request for '" << args.set
- << "' but our set name is '" << _rsConfig.getReplSetName() << "'";
- } else if (myver < args.cfgver) {
- // we are stale. don't vote
- log() << "replSetElect not voting because our config version is stale. Our version: "
- << myver << ", their version: " << args.cfgver;
- } else if (myver > args.cfgver) {
- // they are stale!
- log() << "replSetElect command received stale config version # during election. "
- "Our version: "
- << myver << ", their version: " << args.cfgver;
- vote = -10000;
- } else if (!hopeful) {
- log() << "replSetElect couldn't find member with id " << args.whoid;
- vote = -10000;
- } else if (_iAmPrimary()) {
- log() << "I am already primary, " << hopeful->getHostAndPort().toString()
- << " can try again once I've stepped down";
- vote = -10000;
- } else if (primary) {
- log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but "
- << primary->getHostAndPort().toString() << " is already primary";
- vote = -10000;
- } else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) {
- // TODO(spencer): What if the lower-priority member is more up-to-date?
- log() << hopeful->getHostAndPort().toString() << " has lower priority than "
- << highestPriority->getHostAndPort().toString();
- vote = -10000;
- } else if (_voteLease.when + VoteLease::leaseTime >= now && _voteLease.whoId != args.whoid) {
- log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for "
- << _voteLease.whoHostAndPort.toString() << ' '
- << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
- } else {
- _voteLease.when = now;
- _voteLease.whoId = args.whoid;
- _voteLease.whoHostAndPort = hopeful->getHostAndPort();
- vote = _selfConfig().getNumVotes();
- invariant(hopeful->getId() == args.whoid);
- if (vote > 0) {
- log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " ("
- << args.whoid << ')';
- }
- }
-
- response->append("vote", vote);
- response->append("round", args.round);
- *result = Status::OK();
-}
-
-// produce a reply to a heartbeat
-Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now,
- const ReplSetHeartbeatArgs& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response) {
- if (args.getProtocolVersion() != 1) {
- return Status(ErrorCodes::BadValue,
- str::stream() << "replset: incompatible replset protocol version: "
- << args.getProtocolVersion());
- }
-
- // Verify that replica set names match
- const std::string rshb = args.getSetName();
- if (ourSetName != rshb) {
- log() << "replSet set names do not match, ours: " << ourSetName
- << "; remote node's: " << rshb;
- response->noteMismatched();
- return Status(ErrorCodes::InconsistentReplicaSetNames,
- str::stream() << "Our set name of " << ourSetName << " does not match name "
- << rshb
- << " reported by remote node");
- }
-
- const MemberState myState = getMemberState();
- if (_selfIndex == -1) {
- if (myState.removed()) {
- return Status(ErrorCodes::InvalidReplicaSetConfig,
- "Our replica set configuration is invalid or does not include us");
- }
- } else {
- invariant(_rsConfig.getReplSetName() == args.getSetName());
- if (args.getSenderId() == _selfConfig().getId()) {
- return Status(ErrorCodes::BadValue,
- str::stream() << "Received heartbeat from member with the same "
- "member ID as ourself: "
- << args.getSenderId());
- }
- }
-
- // This is a replica set
- response->noteReplSet();
-
- response->setSetName(ourSetName);
- response->setState(myState.s);
- if (myState.primary()) {
- response->setElectionTime(_electionTime);
- }
-
- const OpTime lastOpApplied = getMyLastAppliedOpTime();
- const OpTime lastOpDurable = getMyLastDurableOpTime();
-
- // Are we electable
- response->setElectable(!_getMyUnelectableReason(now, StartElectionReason::kElectionTimeout));
-
- // Heartbeat status message
- response->setHbMsg(_getHbmsg(now));
- response->setTime(duration_cast<Seconds>(now - Date_t{}));
- response->setAppliedOpTime(lastOpApplied);
- response->setDurableOpTime(lastOpDurable);
-
- if (!_syncSource.empty()) {
- response->setSyncingTo(_syncSource);
- }
-
- if (!_rsConfig.isInitialized()) {
- response->setConfigVersion(-2);
- return Status::OK();
- }
-
- const long long v = _rsConfig.getConfigVersion();
- response->setConfigVersion(v);
- // Deliver new config if caller's version is older than ours
- if (v > args.getConfigVersion()) {
- response->setConfig(_rsConfig);
- }
-
- // Resolve the caller's id in our Member list
- int from = -1;
- if (v == args.getConfigVersion() && args.getSenderId() != -1) {
- from = _getMemberIndex(args.getSenderId());
- }
- if (from == -1) {
- // Can't find the member, so we leave out the stateDisagreement field
- return Status::OK();
- }
- invariant(from != _selfIndex);
-
- // if we thought that this node is down, let it know
- if (!_memberData.at(from).up()) {
- response->noteStateDisagreement();
- }
-
- // note that we got a heartbeat from this node
- _memberData.at(from).setLastHeartbeatRecv(now);
- return Status::OK();
-}
-
-Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now,
- const ReplSetHeartbeatArgsV1& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response) {
- // Verify that replica set names match
- const std::string rshb = args.getSetName();
- if (ourSetName != rshb) {
- log() << "replSet set names do not match, ours: " << ourSetName
- << "; remote node's: " << rshb;
- return Status(ErrorCodes::InconsistentReplicaSetNames,
- str::stream() << "Our set name of " << ourSetName << " does not match name "
- << rshb
- << " reported by remote node");
- }
-
- const MemberState myState = getMemberState();
- if (_selfIndex == -1) {
- if (myState.removed()) {
- return Status(ErrorCodes::InvalidReplicaSetConfig,
- "Our replica set configuration is invalid or does not include us");
- }
- } else {
- if (args.getSenderId() == _selfConfig().getId()) {
- return Status(ErrorCodes::BadValue,
- str::stream() << "Received heartbeat from member with the same "
- "member ID as ourself: "
- << args.getSenderId());
- }
- }
-
- response->setSetName(ourSetName);
-
- response->setState(myState.s);
-
- if (myState.primary()) {
- response->setElectionTime(_electionTime);
- }
-
- const OpTime lastOpApplied = getMyLastAppliedOpTime();
- const OpTime lastOpDurable = getMyLastDurableOpTime();
- response->setAppliedOpTime(lastOpApplied);
- response->setDurableOpTime(lastOpDurable);
-
- if (_currentPrimaryIndex != -1) {
- response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId());
- }
-
- response->setTerm(_term);
-
- if (!_syncSource.empty()) {
- response->setSyncingTo(_syncSource);
- }
-
- if (!_rsConfig.isInitialized()) {
- response->setConfigVersion(-2);
- return Status::OK();
- }
-
- const long long v = _rsConfig.getConfigVersion();
- response->setConfigVersion(v);
- // Deliver new config if caller's version is older than ours
- if (v > args.getConfigVersion()) {
- response->setConfig(_rsConfig);
- }
-
- // Resolve the caller's id in our Member list
- int from = -1;
- if (v == args.getConfigVersion() && args.getSenderId() != -1) {
- from = _getMemberIndex(args.getSenderId());
- }
- if (from == -1) {
- return Status::OK();
- }
- invariant(from != _selfIndex);
-
- // note that we got a heartbeat from this node
- _memberData.at(from).setLastHeartbeatRecv(now);
- return Status::OK();
-}
-
-int TopologyCoordinatorImpl::_getMemberIndex(int id) const {
- int index = 0;
- for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
- ++it, ++index) {
- if (it->getId() == id) {
- return index;
- }
- }
- return -1;
-}
-
-std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequest(
- Date_t now, const std::string& ourSetName, const HostAndPort& target) {
- PingStats& hbStats = _pings[target];
- Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
- if (!_rsConfig.isInitialized() ||
- (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) ||
- (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
- // This is either the first request ever for "target", or the heartbeat timeout has
- // passed, so we're starting a "new" heartbeat.
- hbStats.start(now);
- alreadyElapsed = Milliseconds(0);
- }
- ReplSetHeartbeatArgs hbArgs;
- hbArgs.setProtocolVersion(1);
- hbArgs.setCheckEmpty(false);
- if (_rsConfig.isInitialized()) {
- hbArgs.setSetName(_rsConfig.getReplSetName());
- hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
- if (_selfIndex >= 0) {
- const MemberConfig& me = _selfConfig();
- hbArgs.setSenderHost(me.getHostAndPort());
- hbArgs.setSenderId(me.getId());
- }
- } else {
- hbArgs.setSetName(ourSetName);
- hbArgs.setConfigVersion(-2);
- }
- if (serverGlobalParams.featureCompatibility.getVersion() !=
- ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) {
- hbArgs.setHeartbeatVersion(1);
- }
-
- const Milliseconds timeoutPeriod(
- _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
- : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod});
- const Milliseconds timeout = timeoutPeriod - alreadyElapsed;
- return std::make_pair(hbArgs, timeout);
-}
-
-std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequestV1(
- Date_t now, const std::string& ourSetName, const HostAndPort& target) {
- PingStats& hbStats = _pings[target];
- Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64());
- if (!_rsConfig.isInitialized() ||
- (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) ||
- (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
- // This is either the first request ever for "target", or the heartbeat timeout has
- // passed, so we're starting a "new" heartbeat.
- hbStats.start(now);
- alreadyElapsed = Milliseconds(0);
- }
- ReplSetHeartbeatArgsV1 hbArgs;
- if (_rsConfig.isInitialized()) {
- hbArgs.setSetName(_rsConfig.getReplSetName());
- hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
- if (_selfIndex >= 0) {
- const MemberConfig& me = _selfConfig();
- hbArgs.setSenderId(me.getId());
- hbArgs.setSenderHost(me.getHostAndPort());
- }
- hbArgs.setTerm(_term);
- } else {
- hbArgs.setSetName(ourSetName);
- // Config version -2 is for uninitialized config.
- hbArgs.setConfigVersion(-2);
- hbArgs.setTerm(OpTime::kInitialTerm);
- }
- if (serverGlobalParams.featureCompatibility.getVersion() !=
- ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) {
- hbArgs.setHeartbeatVersion(1);
- }
-
- const Milliseconds timeoutPeriod(
- _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
- : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod});
- const Milliseconds timeout(timeoutPeriod - alreadyElapsed);
- return std::make_pair(hbArgs, timeout);
-}
-
-HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse(
- Date_t now,
- Milliseconds networkRoundTripTime,
- const HostAndPort& target,
- const StatusWith<ReplSetHeartbeatResponse>& hbResponse) {
- const MemberState originalState = getMemberState();
- PingStats& hbStats = _pings[target];
- invariant(hbStats.getLastHeartbeatStartDate() != Date_t());
- const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) ||
- (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed);
- if (!hbResponse.isOK()) {
- if (isUnauthorized) {
- hbStats.hit(networkRoundTripTime);
- } else {
- hbStats.miss();
- }
- } else {
- hbStats.hit(networkRoundTripTime);
- // Log diagnostics.
- if (hbResponse.getValue().isStateDisagreement()) {
- LOG(1) << target << " thinks that we are down because they cannot send us heartbeats.";
- }
- }
-
- // If a node is not PRIMARY and has no sync source, we increase the heartbeat rate in order
- // to help it find a sync source more quickly, which helps ensure the PRIMARY will continue to
- // see the majority of the cluster.
- //
- // Arbiters also decrease their heartbeat interval to at most half the election timeout period.
- Milliseconds heartbeatInterval = _rsConfig.getHeartbeatInterval();
- if (_rsConfig.getProtocolVersion() == 1) {
- if (getMemberState().arbiter()) {
- heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2,
- _rsConfig.getHeartbeatInterval());
- } else if (getSyncSourceAddress().empty() && !_iAmPrimary()) {
- heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2,
- _rsConfig.getHeartbeatInterval() / 4);
- }
- }
-
- const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
- Date_t nextHeartbeatStartDate;
- // Determine next heartbeat start time.
- if (hbStats.getNumFailuresSinceLastStart() <= kMaxHeartbeatRetries &&
- alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriod()) {
- // There are still retries left, let's use one.
- nextHeartbeatStartDate = now;
- } else {
- nextHeartbeatStartDate = now + heartbeatInterval;
- }
-
- if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) {
- const long long currentConfigVersion =
- _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2;
- const ReplSetConfig& newConfig = hbResponse.getValue().getConfig();
- if (newConfig.getConfigVersion() > currentConfigVersion) {
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction();
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
- return nextAction;
- } else {
- // Could be we got the newer version before we got the response, or the
- // target erroneously sent us one, even through it isn't newer.
- if (newConfig.getConfigVersion() < currentConfigVersion) {
- LOG(1) << "Config version from heartbeat was older than ours.";
- } else {
- LOG(2) << "Config from heartbeat response was same as ours.";
- }
- if (logger::globalLogDomain()->shouldLog(MongoLogDefaultComponent_component,
- ::mongo::LogstreamBuilder::severityCast(2))) {
- LogstreamBuilder lsb = log();
- if (_rsConfig.isInitialized()) {
- lsb << "Current config: " << _rsConfig.toBSON() << "; ";
- }
- lsb << "Config in heartbeat: " << newConfig.toBSON();
- }
- }
- }
-
- // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do,
- // so return early.
- if (!_rsConfig.isInitialized()) {
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
- return nextAction;
- }
- // If we're not in the config, we don't need to respond to heartbeats.
- if (_selfIndex == -1) {
- LOG(1) << "Could not find ourself in current config so ignoring heartbeat from " << target
- << " -- current config: " << _rsConfig.toBSON();
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
- return nextAction;
- }
- const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target);
- if (memberIndex == -1) {
- LOG(1) << "Could not find " << target << " in current config so ignoring --"
- " current config: "
- << _rsConfig.toBSON();
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
- return nextAction;
- }
-
- invariant(memberIndex != _selfIndex);
-
- MemberData& hbData = _memberData.at(memberIndex);
- const MemberConfig member = _rsConfig.getMemberAt(memberIndex);
- bool advancedOpTime = false;
- if (!hbResponse.isOK()) {
- if (isUnauthorized) {
- hbData.setAuthIssue(now);
- } else if (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries ||
- alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriod()) {
- hbData.setDownValues(now, hbResponse.getStatus().reason());
- } else {
- LOG(3) << "Bad heartbeat response from " << target << "; trying again; Retries left: "
- << (kMaxHeartbeatRetries - hbStats.getNumFailuresSinceLastStart()) << "; "
- << alreadyElapsed << " have already elapsed";
- }
- } else {
- ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue());
- LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId()
- << ", msg: " << hbr.getHbMsg();
- advancedOpTime = hbData.setUpValues(now, std::move(hbr));
- }
-
- HeartbeatResponseAction nextAction;
- if (_rsConfig.getProtocolVersion() == 0) {
- nextAction = _updatePrimaryFromHBData(memberIndex, originalState, now);
- } else {
- nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now);
- }
-
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
- nextAction.setAdvancedOpTime(advancedOpTime);
- return nextAction;
-}
-
-bool TopologyCoordinatorImpl::haveNumNodesReachedOpTime(const OpTime& targetOpTime,
- int numNodes,
- bool durablyWritten) {
- // Replication progress that is for some reason ahead of us should not allow us to
- // satisfy a write concern if we aren't caught up ourselves.
- OpTime myOpTime = durablyWritten ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
- if (myOpTime < targetOpTime) {
- return false;
- }
-
- for (auto&& memberData : _memberData) {
- const OpTime& memberOpTime =
- durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime();
- if (memberOpTime >= targetOpTime) {
- --numNodes;
- }
-
- if (numNodes <= 0) {
- return true;
- }
- }
- return false;
-}
-
-bool TopologyCoordinatorImpl::haveTaggedNodesReachedOpTime(const OpTime& opTime,
- const ReplSetTagPattern& tagPattern,
- bool durablyWritten) {
- ReplSetTagMatch matcher(tagPattern);
- for (auto&& memberData : _memberData) {
- const OpTime& memberOpTime =
- durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime();
- if (memberOpTime >= opTime) {
- // This node has reached the desired optime, now we need to check if it is a part
- // of the tagPattern.
- int memberIndex = memberData.getConfigIndex();
- invariant(memberIndex >= 0);
- const MemberConfig& memberConfig = _rsConfig.getMemberAt(memberIndex);
- for (MemberConfig::TagIterator it = memberConfig.tagsBegin();
- it != memberConfig.tagsEnd();
- ++it) {
- if (matcher.update(*it)) {
- return true;
- }
- }
- }
- }
- return false;
-}
-
-HeartbeatResponseAction TopologyCoordinatorImpl::checkMemberTimeouts(Date_t now) {
- bool stepdown = false;
- for (int memberIndex = 0; memberIndex < static_cast<int>(_memberData.size()); memberIndex++) {
- auto& memberData = _memberData[memberIndex];
- if (!memberData.isSelf() && !memberData.lastUpdateStale() &&
- now - memberData.getLastUpdate() >= _rsConfig.getElectionTimeoutPeriod()) {
- memberData.markLastUpdateStale();
- if (_iAmPrimary()) {
- stepdown = stepdown || setMemberAsDown(now, memberIndex);
- }
- }
- }
- if (stepdown) {
- log() << "can't see a majority of the set, relinquishing primary";
- return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
- }
- return HeartbeatResponseAction::makeNoAction();
-}
-
-std::vector<HostAndPort> TopologyCoordinatorImpl::getHostsWrittenTo(const OpTime& op,
- bool durablyWritten,
- bool skipSelf) {
- std::vector<HostAndPort> hosts;
- for (const auto& memberData : _memberData) {
- if (skipSelf && memberData.isSelf()) {
- continue;
- }
-
- if (durablyWritten) {
- if (memberData.getLastDurableOpTime() < op) {
- continue;
- }
- } else if (memberData.getLastAppliedOpTime() < op) {
- continue;
- }
-
- hosts.push_back(memberData.getHostAndPort());
- }
- return hosts;
-}
-
-bool TopologyCoordinatorImpl::setMemberAsDown(Date_t now, const int memberIndex) {
- invariant(memberIndex != _selfIndex);
- invariant(memberIndex != -1);
- invariant(_currentPrimaryIndex == _selfIndex);
- MemberData& hbData = _memberData.at(memberIndex);
- hbData.setDownValues(now, "no response within election timeout period");
-
- if (CannotSeeMajority & _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) {
- return true;
- }
-
- return false;
-}
-
-std::pair<int, Date_t> TopologyCoordinatorImpl::getStalestLiveMember() const {
- Date_t earliestDate = Date_t::max();
- int earliestMemberId = -1;
- for (const auto& memberData : _memberData) {
- if (memberData.isSelf()) {
- continue;
- }
- if (memberData.lastUpdateStale()) {
- // Already stale.
- continue;
- }
- LOG(3) << "memberData lastupdate is: " << memberData.getLastUpdate();
- if (earliestDate > memberData.getLastUpdate()) {
- earliestDate = memberData.getLastUpdate();
- earliestMemberId = memberData.getMemberId();
- }
- }
- LOG(3) << "stalest member " << earliestMemberId << " date: " << earliestDate;
- return std::make_pair(earliestMemberId, earliestDate);
-}
-
-void TopologyCoordinatorImpl::resetAllMemberTimeouts(Date_t now) {
- for (auto&& memberData : _memberData)
- memberData.updateLiveness(now);
-}
-
-void TopologyCoordinatorImpl::resetMemberTimeouts(
- Date_t now, const stdx::unordered_set<HostAndPort>& member_set) {
- for (auto&& memberData : _memberData) {
- if (member_set.count(memberData.getHostAndPort()))
- memberData.updateLiveness(now);
- }
-}
-
-OpTime TopologyCoordinatorImpl::getMyLastAppliedOpTime() const {
- return _selfMemberData().getLastAppliedOpTime();
-}
-
-OpTime TopologyCoordinatorImpl::getMyLastDurableOpTime() const {
- return _selfMemberData().getLastDurableOpTime();
-}
-
-MemberData* TopologyCoordinatorImpl::getMyMemberData() {
- return &_memberData[_selfMemberDataIndex()];
-}
-
-MemberData* TopologyCoordinatorImpl::findMemberDataByMemberId(const int memberId) {
- const int memberIndex = _getMemberIndex(memberId);
- if (memberIndex >= 0)
- return &_memberData[memberIndex];
- return nullptr;
-}
-
-MemberData* TopologyCoordinatorImpl::findMemberDataByRid(const OID rid) {
- for (auto& memberData : _memberData) {
- if (memberData.getRid() == rid)
- return &memberData;
- }
- return nullptr;
-}
-
-MemberData* TopologyCoordinatorImpl::addSlaveMemberData(const OID rid) {
- invariant(!_memberData.empty()); // Must always have our own entry first.
- invariant(!_rsConfig.isInitialized()); // Used only for master-slave.
- _memberData.emplace_back();
- auto* result = &_memberData.back();
- result->setRid(rid);
- return result;
-}
-
-HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1(
- int updatedConfigIndex, const MemberState& originalState, Date_t now) {
- //
- // Updates the local notion of which remote node, if any is primary.
- // Start the priority takeover process if we are eligible.
- //
-
- invariant(updatedConfigIndex != _selfIndex);
-
- // If we are missing from the config, do not participate in primary maintenance or election.
- if (_selfIndex == -1) {
- return HeartbeatResponseAction::makeNoAction();
- }
- // If we are the primary, there must be no other primary, otherwise its higher term would
- // have already made us step down.
- if (_currentPrimaryIndex == _selfIndex) {
- return HeartbeatResponseAction::makeNoAction();
- }
-
- // Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex.
- int primaryIndex = -1;
- for (size_t i = 0; i < _memberData.size(); i++) {
- const MemberData& member = _memberData.at(i);
- if (member.getState().primary() && member.up()) {
- if (primaryIndex == -1 || _memberData.at(primaryIndex).getTerm() < member.getTerm()) {
- primaryIndex = i;
- }
- }
- }
- _currentPrimaryIndex = primaryIndex;
- if (_currentPrimaryIndex == -1) {
- return HeartbeatResponseAction::makeNoAction();
- }
-
- // Clear last heartbeat message on ourselves.
- setMyHeartbeatMessage(now, "");
-
- // Takeover when the replset is stable.
- //
- // Take over the primary only if the remote primary is in the latest term I know.
- // This is done only when we get a heartbeat response from the primary.
- // Otherwise, there must be an outstanding election, which may succeed or not, but
- // the remote primary will become aware of that election eventually and step down.
- if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) {
-
- // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled.
- bool catchupTakeoverDisabled =
- ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() ||
- ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay();
- if (!catchupTakeoverDisabled && (_memberData.at(primaryIndex).getLastAppliedOpTime() <
- _memberData.at(_selfIndex).getLastAppliedOpTime())) {
- LOG(2) << "I can take over the primary due to fresher data."
- << " Current primary index: " << primaryIndex << " in term "
- << _memberData.at(primaryIndex).getTerm() << "."
- << " Current primary optime: "
- << _memberData.at(primaryIndex).getLastAppliedOpTime()
- << " My optime: " << _memberData.at(_selfIndex).getLastAppliedOpTime();
-
- return HeartbeatResponseAction::makeCatchupTakeoverAction();
- }
-
- if (_rsConfig.getMemberAt(primaryIndex).getPriority() <
- _rsConfig.getMemberAt(_selfIndex).getPriority()) {
- LOG(4) << "I can take over the primary due to higher priority."
- << " Current primary index: " << primaryIndex << " in term "
- << _memberData.at(primaryIndex).getTerm();
-
- return HeartbeatResponseAction::makePriorityTakeoverAction();
- }
- }
- return HeartbeatResponseAction::makeNoAction();
-}
-
-HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData(
- int updatedConfigIndex, const MemberState& originalState, Date_t now) {
- // This method has two interrelated responsibilities, performed in two phases.
- //
- // First, it updates the local notion of which remote node, if any is primary. In the
- // process, it may request a remote primary to step down because there is a higher priority
- // node waiting, or because the local node thinks it is primary and that it has a more
- // recent electionTime. It may instead decide that the local node should step down itself,
- // because a remote has a more recent election time.
- //
- // Second, if there is no remote primary, and the local node is not primary, it considers
- // whether or not to stand for election.
- invariant(updatedConfigIndex != _selfIndex);
-
- // We are missing from the config, so do not participate in primary maintenance or election.
- if (_selfIndex == -1) {
- return HeartbeatResponseAction::makeNoAction();
- }
-
- ////////////////////
- // Phase 1
- ////////////////////
-
- // If we believe the node whose data was just updated is primary, confirm that
- // the updated data supports that notion. If not, erase our notion of who is primary.
- if (updatedConfigIndex == _currentPrimaryIndex) {
- const MemberData& updatedHBData = _memberData.at(updatedConfigIndex);
- if (!updatedHBData.up() || !updatedHBData.getState().primary()) {
- _currentPrimaryIndex = -1;
- }
- }
-
- // If the current primary is not highest priority and up to date (within 10s),
- // have them/me stepdown.
- if (_currentPrimaryIndex != -1) {
- // check if we should ask the primary (possibly ourselves) to step down
- const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
- if (highestPriorityIndex != -1) {
- const MemberConfig& currentPrimaryMember = _rsConfig.getMemberAt(_currentPrimaryIndex);
- const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
- const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex
- ? getMyLastAppliedOpTime()
- : _memberData.at(highestPriorityIndex).getHeartbeatAppliedOpTime();
-
- if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) &&
- _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime)) {
- const OpTime latestOpTime = _latestKnownOpTime();
-
- if (_iAmPrimary()) {
- if (_leaderMode == LeaderMode::kSteppingDown) {
- return HeartbeatResponseAction::makeNoAction();
- }
- log() << "Stepping down self (priority " << currentPrimaryMember.getPriority()
- << ") because " << highestPriorityMember.getHostAndPort()
- << " has higher priority " << highestPriorityMember.getPriority()
- << " and is only "
- << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
- << " seconds behind me";
- const Date_t until =
- now + VoteLease::leaseTime + _rsConfig.getHeartbeatInterval();
- if (_electionSleepUntil < until) {
- _electionSleepUntil = until;
- }
- return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
- } else if ((highestPriorityIndex == _selfIndex) && (_electionSleepUntil <= now)) {
- // If this node is the highest priority node, and it is not in
- // an inter-election sleep period, ask the current primary to step down.
- // This is an optimization, because the remote primary will almost certainly
- // notice this node's electability promptly, via its own heartbeat process.
- log() << "Requesting that " << currentPrimaryMember.getHostAndPort()
- << " (priority " << currentPrimaryMember.getPriority()
- << ") step down because I have higher priority "
- << highestPriorityMember.getPriority() << " and am only "
- << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
- << " seconds behind it";
- int primaryIndex = _currentPrimaryIndex;
- _currentPrimaryIndex = -1;
- return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex);
- }
- }
- }
- }
-
- // Scan the member list's heartbeat data for who is primary, and update
- // _currentPrimaryIndex and _role, or request a remote to step down, as necessary.
- {
- int remotePrimaryIndex = -1;
- for (std::vector<MemberData>::const_iterator it = _memberData.begin();
- it != _memberData.end();
- ++it) {
- const int itIndex = indexOfIterator(_memberData, it);
- if (itIndex == _selfIndex) {
- continue;
- }
-
- if (it->getState().primary() && it->up()) {
- if (remotePrimaryIndex != -1) {
- // two other nodes think they are primary (asynchronously polled)
- // -- wait for things to settle down.
- warning() << "two remote primaries (transiently)";
- return HeartbeatResponseAction::makeNoAction();
- }
- remotePrimaryIndex = itIndex;
- }
- }
-
- if (remotePrimaryIndex != -1) {
- // If it's the same as last time, don't do anything further.
- if (_currentPrimaryIndex == remotePrimaryIndex) {
- return HeartbeatResponseAction::makeNoAction();
- }
- // Clear last heartbeat message on ourselves (why?)
- setMyHeartbeatMessage(now, "");
-
- // If we are also primary, this is a problem. Determine who should step down.
- if (_iAmPrimary()) {
- Timestamp remoteElectionTime = _memberData.at(remotePrimaryIndex).getElectionTime();
- log() << "another primary seen with election time " << remoteElectionTime
- << " my election time is " << _electionTime;
-
- // Step down whomever has the older election time.
- if (remoteElectionTime > _electionTime) {
- if (_leaderMode == LeaderMode::kSteppingDown) {
- return HeartbeatResponseAction::makeNoAction();
- }
- log() << "stepping down; another primary was elected more recently";
- return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
- } else {
- log() << "another PRIMARY detected and it should step down"
- " since it was elected earlier than me";
- return HeartbeatResponseAction::makeStepDownRemoteAction(remotePrimaryIndex);
- }
- }
-
- _currentPrimaryIndex = remotePrimaryIndex;
- return HeartbeatResponseAction::makeNoAction();
- }
- }
-
- ////////////////////
- // Phase 2
- ////////////////////
-
- // We do not believe any remote to be primary.
-
- // If we are primary, check if we can still see majority of the set;
- // stepdown if we can't.
- if (_iAmPrimary()) {
- if (CannotSeeMajority &
- _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) {
- if (_leaderMode == LeaderMode::kSteppingDown) {
- return HeartbeatResponseAction::makeNoAction();
- }
- log() << "can't see a majority of the set, relinquishing primary";
- return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
- }
-
- LOG(2) << "Choosing to remain primary";
- return HeartbeatResponseAction::makeNoAction();
- }
-
- fassert(18505, _currentPrimaryIndex == -1);
-
- const MemberState currentState = getMemberState();
- if (originalState.recovering() && currentState.secondary()) {
- // We just transitioned from RECOVERING to SECONDARY, this can only happen if we
- // received a heartbeat with an auth error when previously all the heartbeats we'd
- // received had auth errors. In this case, don't return makeElectAction() because
- // that could cause the election to start before the ReplicationCoordinator has updated
- // its notion of the member state to SECONDARY. Instead return noAction so that the
- // ReplicationCooridinator knows to update its tracking of the member state off of the
- // TopologyCoordinator, and leave starting the election until the next heartbeat comes
- // back.
- return HeartbeatResponseAction::makeNoAction();
- }
-
- // At this point, there is no primary anywhere. Check to see if we should become a candidate.
- const auto status = checkShouldStandForElection(now);
- if (!status.isOK()) {
- // NOTE: This log line is checked in unit test(s).
- LOG(2) << "TopologyCoordinatorImpl::_updatePrimaryFromHBData - " << status.reason();
- return HeartbeatResponseAction::makeNoAction();
- }
- fassertStatusOK(28816, becomeCandidateIfElectable(now, StartElectionReason::kElectionTimeout));
- return HeartbeatResponseAction::makeElectAction();
-}
-
-Status TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now) const {
- if (_currentPrimaryIndex != -1) {
- return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"};
- }
- invariant(_role != Role::leader);
-
- if (_role == Role::candidate) {
- return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"};
- }
-
- const UnelectableReasonMask unelectableReason =
- _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout);
- if (NotCloseEnoughToLatestOptime & unelectableReason) {
- return {ErrorCodes::NodeNotElectable,
- str::stream() << "Not standing for election because "
- << _getUnelectableReasonString(unelectableReason)
- << "; my last optime is "
- << getMyLastAppliedOpTime().toString()
- << " and the newest is "
- << _latestKnownOpTime().toString()};
- }
- if (unelectableReason) {
- return {ErrorCodes::NodeNotElectable,
- str::stream() << "Not standing for election because "
- << _getUnelectableReasonString(unelectableReason)};
- }
- if (_electionSleepUntil > now) {
- if (_rsConfig.getProtocolVersion() == 1) {
- return {
- ErrorCodes::NodeNotElectable,
- str::stream() << "Not standing for election before "
- << dateToISOStringLocal(_electionSleepUntil)
- << " because I stood up or learned about a new term too recently"};
- } else {
- return {ErrorCodes::NodeNotElectable,
- str::stream() << "Not standing for election before "
- << dateToISOStringLocal(_electionSleepUntil)
- << " because I stood too recently"};
- }
- }
- // All checks passed. Start election proceedings.
- return Status::OK();
-}
-
-bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const {
- int vUp = 0;
- for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
- ++it) {
- const int itIndex = indexOfIterator(_memberData, it);
- if (itIndex == _selfIndex || it->up()) {
- vUp += _rsConfig.getMemberAt(itIndex).getNumVotes();
- }
- }
-
- return vUp * 2 > _rsConfig.getTotalVotingMembers();
-}
-
-int TopologyCoordinatorImpl::_findHealthyPrimaryOfEqualOrGreaterPriority(
- const int candidateIndex) const {
- const double candidatePriority = _rsConfig.getMemberAt(candidateIndex).getPriority();
- for (auto it = _memberData.begin(); it != _memberData.end(); ++it) {
- if (!it->up() || it->getState() != MemberState::RS_PRIMARY) {
- continue;
- }
- const int itIndex = indexOfIterator(_memberData, it);
- const double priority = _rsConfig.getMemberAt(itIndex).getPriority();
- if (itIndex != candidateIndex && priority >= candidatePriority) {
- return itIndex;
- }
- }
-
- return -1;
-}
-
-bool TopologyCoordinatorImpl::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const {
- const OpTime latestKnownOpTime = _latestKnownOpTime();
- // Use addition instead of subtraction to avoid overflow.
- return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs());
-}
-
-bool TopologyCoordinatorImpl::_amIFreshEnoughForPriorityTakeover() const {
- const OpTime latestKnownOpTime = _latestKnownOpTime();
-
- // Rules are:
- // - If the terms don't match, we don't call for priority takeover.
- // - If our optime and the latest optime happen in different seconds, our optime must be within
- // at least priorityTakeoverFreshnessWindowSeconds seconds of the latest optime.
- // - If our optime and the latest optime happen in the same second, our optime must be within
- // at least 1000 oplog entries of the latest optime (i.e. the increment portion of the timestamp
- // must be within 1000). This is to handle the case where a primary had its clock set far into
- // the future, took some writes, then had its clock set back. In that case the timestamp
- // component of all future oplog entries generated will be the same, until real world time
- // passes the timestamp component of the last oplog entry.
-
- const OpTime ourLastOpApplied = getMyLastAppliedOpTime();
- if (ourLastOpApplied.getTerm() != latestKnownOpTime.getTerm()) {
- return false;
- }
-
- if (ourLastOpApplied.getTimestamp().getSecs() != latestKnownOpTime.getTimestamp().getSecs()) {
- return ourLastOpApplied.getTimestamp().getSecs() + priorityTakeoverFreshnessWindowSeconds >=
- latestKnownOpTime.getTimestamp().getSecs();
- } else {
- return ourLastOpApplied.getTimestamp().getInc() + 1000 >=
- latestKnownOpTime.getTimestamp().getInc();
- }
-}
-
-bool TopologyCoordinatorImpl::_amIFreshEnoughForCatchupTakeover() const {
-
- const OpTime latestKnownOpTime = _latestKnownOpTime();
-
- // Rules are:
- // - We must have the freshest optime of all the up nodes.
- // - We must specifically have a fresher optime than the primary (can't be equal).
- // - The term of our last applied op must be less than the current term. This ensures that no
- // writes have happened since the most recent election and that the primary is still in
- // catchup mode.
-
- // There is no point to a catchup takeover if we aren't the freshest node because
- // another node would immediately perform another catchup takeover when we become primary.
- const OpTime ourLastOpApplied = getMyLastAppliedOpTime();
- if (ourLastOpApplied < latestKnownOpTime) {
- return false;
- }
-
- if (_currentPrimaryIndex == -1) {
- return false;
- }
-
- // If we aren't ahead of the primary, there is no point to having a catchup takeover.
- const OpTime primaryLastOpApplied = _memberData[_currentPrimaryIndex].getLastAppliedOpTime();
-
- if (ourLastOpApplied <= primaryLastOpApplied) {
- return false;
- }
-
- // If the term of our last applied op is less than the current term, the primary didn't write
- // anything and it is still in catchup mode.
- return ourLastOpApplied.getTerm() < _term;
-}
-
-bool TopologyCoordinatorImpl::_iAmPrimary() const {
- if (_role == Role::leader) {
- invariant(_currentPrimaryIndex == _selfIndex);
- invariant(_leaderMode != LeaderMode::kNotLeader);
- return true;
- }
- return false;
-}
-
-OpTime TopologyCoordinatorImpl::_latestKnownOpTime() const {
- OpTime latest = getMyLastAppliedOpTime();
- for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
- ++it) {
- // Ignore self
- // TODO(russotto): Simplify when heartbeat and spanning tree times are combined.
- if (it->isSelf()) {
- continue;
- }
- // Ignore down members
- if (!it->up()) {
- continue;
- }
- // Ignore removed nodes (not in config, so not valid).
- if (it->getState().removed()) {
- continue;
- }
-
- OpTime optime = it->getHeartbeatAppliedOpTime();
-
- if (optime > latest) {
- latest = optime;
- }
- }
-
- return latest;
-}
-
-bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex,
- int memberTwoIndex) const {
- if (memberOneIndex == -1)
- return false;
-
- if (memberTwoIndex == -1)
- return true;
-
- return _rsConfig.getMemberAt(memberOneIndex).getPriority() >
- _rsConfig.getMemberAt(memberTwoIndex).getPriority();
-}
-
-int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now) const {
- int maxIndex = -1;
- for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) {
- UnelectableReasonMask reason = currentIndex == _selfIndex
- ? _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)
- : _getUnelectableReason(currentIndex);
- if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) {
- maxIndex = currentIndex;
- }
- }
-
- return maxIndex;
-}
-
-bool TopologyCoordinatorImpl::prepareForUnconditionalStepDown() {
- if (_leaderMode == LeaderMode::kSteppingDown) {
- // Can only be processing one required stepdown at a time.
- return false;
- }
- // Heartbeat-initiated stepdowns take precedence over stepdown command initiated stepdowns, so
- // it's safe to transition from kAttemptingStepDown to kSteppingDown.
- _setLeaderMode(LeaderMode::kSteppingDown);
- return true;
-}
-
-Status TopologyCoordinatorImpl::prepareForStepDownAttempt() {
- if (_leaderMode == LeaderMode::kSteppingDown ||
- _leaderMode == LeaderMode::kAttemptingStepDown) {
- return Status{ErrorCodes::ConflictingOperationInProgress,
- "This node is already in the process of stepping down"};
- }
- _setLeaderMode(LeaderMode::kAttemptingStepDown);
- return Status::OK();
-}
-
-void TopologyCoordinatorImpl::abortAttemptedStepDownIfNeeded() {
- if (_leaderMode == TopologyCoordinator::LeaderMode::kAttemptingStepDown) {
- _setLeaderMode(TopologyCoordinator::LeaderMode::kMaster);
- }
-}
-
-void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMemberState,
- const Timestamp& electionTime) {
- invariant(_selfIndex != -1);
- if (newMemberState == getMemberState())
- return;
- switch (newMemberState.s) {
- case MemberState::RS_PRIMARY:
- _role = Role::candidate;
- processWinElection(OID(), electionTime);
- invariant(_role == Role::leader);
- break;
- case MemberState::RS_SECONDARY:
- case MemberState::RS_ROLLBACK:
- case MemberState::RS_RECOVERING:
- case MemberState::RS_STARTUP2:
- _role = Role::follower;
- _followerMode = newMemberState.s;
- if (_currentPrimaryIndex == _selfIndex) {
- _currentPrimaryIndex = -1;
- _setLeaderMode(LeaderMode::kNotLeader);
- }
- break;
- case MemberState::RS_STARTUP:
- updateConfig(ReplSetConfig(), -1, Date_t());
- break;
- default:
- severe() << "Cannot switch to state " << newMemberState;
- invariant(false);
- }
- if (getMemberState() != newMemberState.s) {
- severe() << "Expected to enter state " << newMemberState << " but am now in "
- << getMemberState();
- invariant(false);
- }
- log() << newMemberState;
-}
-
-void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) {
- if (primaryIndex == _selfIndex) {
- changeMemberState_forTest(MemberState::RS_PRIMARY);
- } else {
- if (_iAmPrimary()) {
- changeMemberState_forTest(MemberState::RS_SECONDARY);
- }
- if (primaryIndex != -1) {
- ReplSetHeartbeatResponse hbResponse;
- hbResponse.setState(MemberState::RS_PRIMARY);
- hbResponse.setElectionTime(Timestamp());
- hbResponse.setAppliedOpTime(_memberData.at(primaryIndex).getHeartbeatAppliedOpTime());
- hbResponse.setSyncingTo(HostAndPort());
- hbResponse.setHbMsg("");
- _memberData.at(primaryIndex)
- .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(),
- std::move(hbResponse));
- }
- _currentPrimaryIndex = primaryIndex;
- }
-}
-
-const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const {
- if (_currentPrimaryIndex == -1)
- return NULL;
-
- return &(_rsConfig.getMemberAt(_currentPrimaryIndex));
-}
-
-void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
- BSONObjBuilder* response,
- Status* result) {
- // output for each member
- vector<BSONObj> membersOut;
- const MemberState myState = getMemberState();
- const Date_t now = rsStatusArgs.now;
- const OpTime lastOpApplied = getMyLastAppliedOpTime();
- const OpTime lastOpDurable = getMyLastDurableOpTime();
- const BSONObj& initialSyncStatus = rsStatusArgs.initialSyncStatus;
-
- if (_selfIndex == -1) {
- // We're REMOVED or have an invalid config
- response->append("state", static_cast<int>(myState.s));
- response->append("stateStr", myState.toString());
- response->append("uptime", rsStatusArgs.selfUptime);
-
- appendOpTime(response, "optime", lastOpApplied, _rsConfig.getProtocolVersion());
-
- response->appendDate("optimeDate",
- Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
- if (_maintenanceModeCalls) {
- response->append("maintenanceMode", _maintenanceModeCalls);
- }
- std::string s = _getHbmsg(now);
- if (!s.empty())
- response->append("infoMessage", s);
- *result = Status(ErrorCodes::InvalidReplicaSetConfig,
- "Our replica set config is invalid or we are not a member of it");
- return;
- }
-
- for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
- ++it) {
- const int itIndex = indexOfIterator(_memberData, it);
- if (itIndex == _selfIndex) {
- // add self
- BSONObjBuilder bb;
- bb.append("_id", _selfConfig().getId());
- bb.append("name", _selfConfig().getHostAndPort().toString());
- bb.append("health", 1.0);
- bb.append("state", static_cast<int>(myState.s));
- bb.append("stateStr", myState.toString());
- bb.append("uptime", rsStatusArgs.selfUptime);
- if (!_selfConfig().isArbiter()) {
- appendOpTime(&bb, "optime", lastOpApplied, _rsConfig.getProtocolVersion());
- bb.appendDate("optimeDate",
- Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
- }
-
- if (!_syncSource.empty() && !_iAmPrimary()) {
- bb.append("syncingTo", _syncSource.toString());
- }
-
- if (_maintenanceModeCalls) {
- bb.append("maintenanceMode", _maintenanceModeCalls);
- }
-
- std::string s = _getHbmsg(now);
- if (!s.empty())
- bb.append("infoMessage", s);
-
- if (myState.primary()) {
- bb.append("electionTime", _electionTime);
- bb.appendDate("electionDate",
- Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs())));
- }
- bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion());
- bb.append("self", true);
- membersOut.push_back(bb.obj());
- } else {
- // add non-self member
- const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex);
- BSONObjBuilder bb;
- bb.append("_id", itConfig.getId());
- bb.append("name", itConfig.getHostAndPort().toString());
- double h = it->getHealth();
- bb.append("health", h);
- const MemberState state = it->getState();
- bb.append("state", static_cast<int>(state.s));
- if (h == 0) {
- // if we can't connect the state info is from the past
- // and could be confusing to show
- bb.append("stateStr", "(not reachable/healthy)");
- } else {
- bb.append("stateStr", it->getState().toString());
- }
-
- const unsigned int uptime = static_cast<unsigned int>((
- it->getUpSince() != Date_t() ? durationCount<Seconds>(now - it->getUpSince()) : 0));
- bb.append("uptime", uptime);
- if (!itConfig.isArbiter()) {
- appendOpTime(
- &bb, "optime", it->getHeartbeatAppliedOpTime(), _rsConfig.getProtocolVersion());
- appendOpTime(&bb,
- "optimeDurable",
- it->getHeartbeatDurableOpTime(),
- _rsConfig.getProtocolVersion());
-
- bb.appendDate("optimeDate",
- Date_t::fromDurationSinceEpoch(
- Seconds(it->getHeartbeatAppliedOpTime().getSecs())));
- bb.appendDate("optimeDurableDate",
- Date_t::fromDurationSinceEpoch(
- Seconds(it->getHeartbeatDurableOpTime().getSecs())));
- }
- bb.appendDate("lastHeartbeat", it->getLastHeartbeat());
- bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv());
- Milliseconds ping = _getPing(itConfig.getHostAndPort());
- if (ping != UninitializedPing) {
- bb.append("pingMs", durationCount<Milliseconds>(ping));
- std::string s = it->getLastHeartbeatMsg();
- if (!s.empty())
- bb.append("lastHeartbeatMessage", s);
- }
- if (it->hasAuthIssue()) {
- bb.append("authenticated", false);
- }
- const HostAndPort& syncSource = it->getSyncSource();
- if (!syncSource.empty() && !state.primary()) {
- bb.append("syncingTo", syncSource.toString());
- }
-
- if (state == MemberState::RS_PRIMARY) {
- bb.append("electionTime", it->getElectionTime());
- bb.appendDate(
- "electionDate",
- Date_t::fromDurationSinceEpoch(Seconds(it->getElectionTime().getSecs())));
- }
- bb.appendIntOrLL("configVersion", it->getConfigVersion());
- membersOut.push_back(bb.obj());
- }
- }
-
- // sort members bson
- sort(membersOut.begin(), membersOut.end(), SimpleBSONObjComparator::kInstance.makeLessThan());
-
- response->append("set", _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : "");
- response->append("date", now);
- response->append("myState", myState.s);
- response->append("term", _term);
-
- // Add sync source info
- if (!_syncSource.empty() && !myState.primary() && !myState.removed()) {
- response->append("syncingTo", _syncSource.toString());
- }
-
- if (_rsConfig.isConfigServer()) {
- response->append("configsvr", true);
- }
-
- response->append("heartbeatIntervalMillis",
- durationCount<Milliseconds>(_rsConfig.getHeartbeatInterval()));
-
- // New optimes, to hold them all.
- BSONObjBuilder optimes;
- _lastCommittedOpTime.append(&optimes, "lastCommittedOpTime");
- if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) {
- rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime");
- }
-
- appendOpTime(&optimes, "appliedOpTime", lastOpApplied, _rsConfig.getProtocolVersion());
- appendOpTime(&optimes, "durableOpTime", lastOpDurable, _rsConfig.getProtocolVersion());
- response->append("optimes", optimes.obj());
-
- if (!initialSyncStatus.isEmpty()) {
- response->append("initialSyncStatus", initialSyncStatus);
- }
-
- response->append("members", membersOut);
- *result = Status::OK();
-}
-
-StatusWith<BSONObj> TopologyCoordinatorImpl::prepareReplSetUpdatePositionCommand(
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle,
- OpTime currentCommittedSnapshotOpTime) const {
- BSONObjBuilder cmdBuilder;
- invariant(_rsConfig.isInitialized());
- // Do not send updates if we have been removed from the config.
- if (_selfIndex == -1) {
- return Status(ErrorCodes::NodeNotFound,
- "This node is not in the current replset configuration.");
- }
- cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
- // Create an array containing objects each live member connected to us and for ourself.
- BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes"));
- for (const auto& memberData : _memberData) {
- if (memberData.getLastAppliedOpTime().isNull()) {
- // Don't include info on members we haven't heard from yet.
- continue;
- }
- // Don't include members we think are down.
- if (!memberData.isSelf() && memberData.lastUpdateStale()) {
- continue;
- }
-
- BSONObjBuilder entry(arrayBuilder.subobjStart());
- switch (commandStyle) {
- case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle:
- memberData.getLastDurableOpTime().append(
- &entry, UpdatePositionArgs::kDurableOpTimeFieldName);
- memberData.getLastAppliedOpTime().append(
- &entry, UpdatePositionArgs::kAppliedOpTimeFieldName);
- break;
- case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle:
- entry.append("_id", memberData.getRid());
- if (_rsConfig.getProtocolVersion() == 1) {
- memberData.getLastDurableOpTime().append(&entry, "optime");
- } else {
- entry.append("optime", memberData.getLastDurableOpTime().getTimestamp());
- }
- break;
- }
- entry.append(UpdatePositionArgs::kMemberIdFieldName, memberData.getMemberId());
- entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
- }
- arrayBuilder.done();
-
- // Add metadata to command. Old style parsing logic will reject the metadata.
- if (commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) {
- prepareReplSetMetadata(currentCommittedSnapshotOpTime)
- .writeToMetadata(&cmdBuilder)
- .transitional_ignore();
- }
- return cmdBuilder.obj();
-}
-
-void TopologyCoordinatorImpl::fillMemberData(BSONObjBuilder* result) {
- BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress"));
- {
- for (const auto& memberData : _memberData) {
- BSONObjBuilder entry(replicationProgress.subobjStart());
- entry.append("rid", memberData.getRid());
- const auto lastDurableOpTime = memberData.getLastDurableOpTime();
- if (_rsConfig.getProtocolVersion() == 1) {
- BSONObjBuilder opTime(entry.subobjStart("optime"));
- opTime.append("ts", lastDurableOpTime.getTimestamp());
- opTime.append("term", lastDurableOpTime.getTerm());
- opTime.done();
- } else {
- entry.append("optime", lastDurableOpTime.getTimestamp());
- }
- entry.append("host", memberData.getHostAndPort().toString());
- if (_selfIndex >= 0) {
- const int memberId = memberData.getMemberId();
- invariant(memberId >= 0);
- entry.append("memberId", memberId);
- }
- }
- }
-}
-
-void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) {
- const MemberState myState = getMemberState();
- if (!_rsConfig.isInitialized()) {
- response->markAsNoConfig();
- return;
- }
-
- for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
- ++it) {
- if (it->isHidden() || it->getSlaveDelay() > Seconds{0}) {
- continue;
- }
-
- if (it->isElectable()) {
- response->addHost(it->getHostAndPort());
- } else if (it->isArbiter()) {
- response->addArbiter(it->getHostAndPort());
- } else {
- response->addPassive(it->getHostAndPort());
- }
- }
-
- response->setReplSetName(_rsConfig.getReplSetName());
- if (myState.removed()) {
- response->markAsNoConfig();
- return;
- }
-
- response->setReplSetVersion(_rsConfig.getConfigVersion());
- response->setIsMaster(myState.primary());
- response->setIsSecondary(myState.secondary());
-
- const MemberConfig* curPrimary = _currentPrimaryMember();
- if (curPrimary) {
- response->setPrimary(curPrimary->getHostAndPort());
- }
-
- const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
- if (selfConfig.isArbiter()) {
- response->setIsArbiterOnly(true);
- } else if (selfConfig.getPriority() == 0) {
- response->setIsPassive(true);
- }
- if (selfConfig.getSlaveDelay() > Seconds(0)) {
- response->setSlaveDelay(selfConfig.getSlaveDelay());
- }
- if (selfConfig.isHidden()) {
- response->setIsHidden(true);
- }
- if (!selfConfig.shouldBuildIndexes()) {
- response->setShouldBuildIndexes(false);
- }
- const ReplSetTagConfig tagConfig = _rsConfig.getTagConfig();
- if (selfConfig.hasTags(tagConfig)) {
- for (MemberConfig::TagIterator tag = selfConfig.tagsBegin(); tag != selfConfig.tagsEnd();
- ++tag) {
- std::string tagKey = tagConfig.getTagKey(*tag);
- if (tagKey[0] == '$') {
- // Filter out internal tags
- continue;
- }
- response->addTag(tagKey, tagConfig.getTagValue(*tag));
- }
- }
- response->setMe(selfConfig.getHostAndPort());
- if (_iAmPrimary()) {
- response->setElectionId(_electionId);
- }
-}
-
-StatusWith<TopologyCoordinatorImpl::PrepareFreezeResponseResult>
-TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) {
- if (_role != TopologyCoordinator::Role::follower) {
- std::string msg = str::stream()
- << "cannot freeze node when primary or running for election. state: "
- << (_role == TopologyCoordinator::Role::leader ? "Primary" : "Running-Election");
- log() << msg;
- return Status(ErrorCodes::NotSecondary, msg);
- }
-
- if (secs == 0) {
- _stepDownUntil = now;
- log() << "'unfreezing'";
- response->append("info", "unfreezing");
-
- if (_isElectableNodeInSingleNodeReplicaSet()) {
- // If we are a one-node replica set, we're the one member,
- // we're electable, we're not in maintenance mode, and we are currently in followerMode
- // SECONDARY, we must transition to candidate now that our stepdown period
- // is no longer active, in leiu of heartbeats.
- _role = Role::candidate;
- return PrepareFreezeResponseResult::kElectSelf;
- }
- } else {
- if (secs == 1)
- response->append("warning", "you really want to freeze for only 1 second?");
-
- _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs));
- log() << "'freezing' for " << secs << " seconds";
- }
-
- return PrepareFreezeResponseResult::kNoAction;
-}
-
-bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) {
- if (_stepDownUntil > now) {
- return false;
- }
-
- if (_isElectableNodeInSingleNodeReplicaSet()) {
- // If the new config describes a one-node replica set, we're the one member,
- // we're electable, we're not in maintenance mode, and we are currently in followerMode
- // SECONDARY, we must transition to candidate, in leiu of heartbeats.
- _role = Role::candidate;
- return true;
- }
- return false;
-}
-
-void TopologyCoordinatorImpl::setElectionSleepUntil(Date_t newTime) {
- if (_electionSleepUntil < newTime) {
- _electionSleepUntil = newTime;
- }
-}
-
-Timestamp TopologyCoordinatorImpl::getElectionTime() const {
- return _electionTime;
-}
-
-OID TopologyCoordinatorImpl::getElectionId() const {
- return _electionId;
-}
-
-int TopologyCoordinatorImpl::getCurrentPrimaryIndex() const {
- return _currentPrimaryIndex;
-}
-
-Date_t TopologyCoordinatorImpl::getStepDownTime() const {
- return _stepDownUntil;
-}
-
-void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig,
- int selfIndex,
- Date_t now) {
- std::vector<MemberData> oldHeartbeats;
- _memberData.swap(oldHeartbeats);
-
- int index = 0;
- for (ReplSetConfig::MemberIterator it = newConfig.membersBegin(); it != newConfig.membersEnd();
- ++it, ++index) {
- const MemberConfig& newMemberConfig = *it;
- MemberData newHeartbeatData;
- for (auto&& oldMemberData : oldHeartbeats) {
- if ((oldMemberData.getMemberId() == newMemberConfig.getId() &&
- oldMemberData.getHostAndPort() == newMemberConfig.getHostAndPort()) ||
- (index == selfIndex && oldMemberData.isSelf())) {
- // This member existed in the old config with the same member ID and
- // HostAndPort, so copy its heartbeat data over.
- newHeartbeatData = oldMemberData;
- break;
- }
- }
- newHeartbeatData.setConfigIndex(index);
- newHeartbeatData.setIsSelf(index == selfIndex);
- newHeartbeatData.setHostAndPort(newMemberConfig.getHostAndPort());
- newHeartbeatData.setMemberId(newMemberConfig.getId());
- _memberData.push_back(newHeartbeatData);
- }
- if (selfIndex < 0) {
- // It's necessary to have self member data even if self isn't in the configuration.
- // We don't need data for the other nodes (which no longer know about us, or soon won't)
- _memberData.clear();
- // We're not in the config, we can't sync any more.
- _syncSource = HostAndPort();
- MemberData newHeartbeatData;
- for (auto&& oldMemberData : oldHeartbeats) {
- if (oldMemberData.isSelf()) {
- newHeartbeatData = oldMemberData;
- break;
- }
- }
- newHeartbeatData.setConfigIndex(-1);
- newHeartbeatData.setIsSelf(true);
- _memberData.push_back(newHeartbeatData);
- }
-}
-
-// This function installs a new config object and recreates MemberData objects
-// that reflect the new config.
-void TopologyCoordinatorImpl::updateConfig(const ReplSetConfig& newConfig,
- int selfIndex,
- Date_t now) {
- invariant(_role != Role::candidate);
- invariant(selfIndex < newConfig.getNumMembers());
-
- // Reset term on startup and upgrade/downgrade of protocol version.
- if (!_rsConfig.isInitialized() ||
- _rsConfig.getProtocolVersion() != newConfig.getProtocolVersion()) {
- if (newConfig.getProtocolVersion() == 1) {
- _term = OpTime::kInitialTerm;
- } else {
- invariant(newConfig.getProtocolVersion() == 0);
- _term = OpTime::kUninitializedTerm;
- }
- LOG(1) << "Updated term in topology coordinator to " << _term << " due to new config";
- }
-
- _updateHeartbeatDataForReconfig(newConfig, selfIndex, now);
- _rsConfig = newConfig;
- _selfIndex = selfIndex;
- _forceSyncSourceIndex = -1;
-
- if (_role == Role::leader) {
- if (_selfIndex == -1) {
- log() << "Could not remain primary because no longer a member of the replica set";
- } else if (!_selfConfig().isElectable()) {
- log() << " Could not remain primary because no longer electable";
- } else {
- // Don't stepdown if you don't have to.
- _currentPrimaryIndex = _selfIndex;
- return;
- }
- _role = Role::follower;
- _setLeaderMode(LeaderMode::kNotLeader);
- }
-
- // By this point we know we are in Role::follower
- _currentPrimaryIndex = -1; // force secondaries to re-detect who the primary is
-
- if (_isElectableNodeInSingleNodeReplicaSet()) {
- // If the new config describes a one-node replica set, we're the one member,
- // we're electable, we're not in maintenance mode and we are currently in followerMode
- // SECONDARY, we must transition to candidate, in leiu of heartbeats.
- _role = Role::candidate;
- }
-}
-std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const {
- // ignore messages over 2 minutes old
- if ((now - _hbmsgTime) > Seconds{120}) {
- return "";
- }
- return _hbmsg;
-}
-
-void TopologyCoordinatorImpl::setMyHeartbeatMessage(const Date_t now, const std::string& message) {
- _hbmsgTime = now;
- _hbmsg = message;
-}
-
-const MemberConfig& TopologyCoordinatorImpl::_selfConfig() const {
- return _rsConfig.getMemberAt(_selfIndex);
-}
-
-const MemberData& TopologyCoordinatorImpl::_selfMemberData() const {
- return _memberData[_selfMemberDataIndex()];
-}
-
-const int TopologyCoordinatorImpl::_selfMemberDataIndex() const {
- invariant(!_memberData.empty());
- if (_selfIndex >= 0)
- return _selfIndex;
- // In master-slave mode, the first entry is for self. If there is no config
- // or we're not in the config, the first-and-only entry should be for self.
- return 0;
-}
-
-TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnelectableReason(
- int index) const {
- invariant(index != _selfIndex);
- const MemberConfig& memberConfig = _rsConfig.getMemberAt(index);
- const MemberData& hbData = _memberData.at(index);
- UnelectableReasonMask result = None;
- if (memberConfig.isArbiter()) {
- result |= ArbiterIAm;
- }
- if (memberConfig.getPriority() <= 0) {
- result |= NoPriority;
- }
- if (hbData.getState() != MemberState::RS_SECONDARY) {
- result |= NotSecondary;
- }
- if (_rsConfig.getProtocolVersion() == 0 &&
- !_isOpTimeCloseEnoughToLatestToElect(hbData.getHeartbeatAppliedOpTime())) {
- result |= NotCloseEnoughToLatestOptime;
- }
- if (hbData.up() && hbData.isUnelectable()) {
- result |= RefusesToStand;
- }
- invariant(result || memberConfig.isElectable());
- return result;
-}
-
-TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUnelectableReason(
- const Date_t now, StartElectionReason reason) const {
- UnelectableReasonMask result = None;
- const OpTime lastApplied = getMyLastAppliedOpTime();
- if (lastApplied.isNull()) {
- result |= NoData;
- }
- if (!_aMajoritySeemsToBeUp()) {
- result |= CannotSeeMajority;
- }
- if (_selfIndex == -1) {
- result |= NotInitialized;
- return result;
- }
- if (_selfConfig().isArbiter()) {
- result |= ArbiterIAm;
- }
- if (_selfConfig().getPriority() <= 0) {
- result |= NoPriority;
- }
- if (_stepDownUntil > now) {
- result |= StepDownPeriodActive;
- }
-
- // Cannot be electable unless secondary or already primary
- if (!getMemberState().secondary() && !_iAmPrimary()) {
- result |= NotSecondary;
- }
-
- if (_rsConfig.getProtocolVersion() == 0) {
- // Election rules only for protocol version 0.
- if (_voteLease.whoId != -1 &&
- _voteLease.whoId != _rsConfig.getMemberAt(_selfIndex).getId() &&
- _voteLease.when + VoteLease::leaseTime >= now) {
- result |= VotedTooRecently;
- }
- if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied)) {
- result |= NotCloseEnoughToLatestOptime;
- }
- } else {
- // Election rules only for protocol version 1.
- invariant(_rsConfig.getProtocolVersion() == 1);
- if (reason == StartElectionReason::kPriorityTakeover &&
- !_amIFreshEnoughForPriorityTakeover()) {
- result |= NotCloseEnoughToLatestForPriorityTakeover;
- }
-
- if (reason == StartElectionReason::kCatchupTakeover &&
- !_amIFreshEnoughForCatchupTakeover()) {
- result |= NotFreshEnoughForCatchupTakeover;
- }
- }
- return result;
-}
-
-std::string TopologyCoordinatorImpl::_getUnelectableReasonString(
- const UnelectableReasonMask ur) const {
- invariant(ur);
- str::stream ss;
- bool hasWrittenToStream = false;
- if (ur & NoData) {
- ss << "node has no applied oplog entries";
- hasWrittenToStream = true;
- }
- if (ur & VotedTooRecently) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "I recently voted for " << _voteLease.whoHostAndPort.toString();
- }
- if (ur & CannotSeeMajority) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "I cannot see a majority";
- }
- if (ur & ArbiterIAm) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member is an arbiter";
- }
- if (ur & NoPriority) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member has zero priority";
- }
- if (ur & StepDownPeriodActive) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "I am still waiting for stepdown period to end at "
- << dateToISOStringLocal(_stepDownUntil);
- }
- if (ur & NotSecondary) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member is not currently a secondary";
- }
- if (ur & NotCloseEnoughToLatestOptime) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member is more than 10 seconds behind the most up-to-date member";
- }
- if (ur & NotCloseEnoughToLatestForPriorityTakeover) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member is not caught up enough to the most up-to-date member to call for priority "
- "takeover - must be within "
- << priorityTakeoverFreshnessWindowSeconds << " seconds";
- }
- if (ur & NotFreshEnoughForCatchupTakeover) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member is either not the most up-to-date member or not ahead of the primary, and "
- "therefore cannot call for catchup takeover";
- }
- if (ur & NotInitialized) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "node is not a member of a valid replica set configuration";
- }
- if (ur & RefusesToStand) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "most recent heartbeat indicates node will not stand for election";
- }
- if (!hasWrittenToStream) {
- severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur);
- fassertFailed(26011);
- }
- ss << " (mask 0x" << integerToHex(ur) << ")";
- return ss;
-}
-
-Milliseconds TopologyCoordinatorImpl::_getPing(const HostAndPort& host) {
- return _pings[host].getMillis();
-}
-
-void TopologyCoordinatorImpl::_setElectionTime(const Timestamp& newElectionTime) {
- _electionTime = newElectionTime;
-}
-
-int TopologyCoordinatorImpl::_getTotalPings() {
- PingMap::iterator it = _pings.begin();
- PingMap::iterator end = _pings.end();
- int totalPings = 0;
- while (it != end) {
- totalPings += it->second.getCount();
- it++;
- }
- return totalPings;
-}
-
-std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const {
- std::vector<HostAndPort> upHosts;
- for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
- ++it) {
- const int itIndex = indexOfIterator(_memberData, it);
- if (itIndex == _selfIndex) {
- continue; // skip ourselves
- }
- if (!it->maybeUp()) {
- continue; // skip DOWN nodes
- }
-
- upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort());
- }
- return upHosts;
-}
-
-bool TopologyCoordinatorImpl::voteForMyself(Date_t now) {
- if (_role != Role::candidate) {
- return false;
- }
- int selfId = _selfConfig().getId();
- if ((_voteLease.when + VoteLease::leaseTime >= now) && (_voteLease.whoId != selfId)) {
- log() << "not voting yea for " << selfId << " voted for "
- << _voteLease.whoHostAndPort.toString() << ' '
- << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
- return false;
- }
- _voteLease.when = now;
- _voteLease.whoId = selfId;
- _voteLease.whoHostAndPort = _selfConfig().getHostAndPort();
- return true;
-}
-
-bool TopologyCoordinatorImpl::isSteppingDown() const {
- return _leaderMode == LeaderMode::kAttemptingStepDown ||
- _leaderMode == LeaderMode::kSteppingDown;
-}
-
-void TopologyCoordinatorImpl::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) {
- // Invariants for valid state transitions.
- switch (_leaderMode) {
- case LeaderMode::kNotLeader:
- invariant(newMode == LeaderMode::kLeaderElect);
- break;
- case LeaderMode::kLeaderElect:
- invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case
- newMode == LeaderMode::kMaster ||
- newMode == LeaderMode::kAttemptingStepDown ||
- newMode == LeaderMode::kSteppingDown);
- break;
- case LeaderMode::kMaster:
- invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case
- newMode == LeaderMode::kAttemptingStepDown ||
- newMode == LeaderMode::kSteppingDown);
- break;
- case LeaderMode::kAttemptingStepDown:
- invariant(newMode == LeaderMode::kNotLeader || newMode == LeaderMode::kMaster ||
- newMode == LeaderMode::kSteppingDown);
- break;
- case LeaderMode::kSteppingDown:
- invariant(newMode == LeaderMode::kNotLeader);
- break;
- }
- _leaderMode = std::move(newMode);
-}
-
-MemberState TopologyCoordinatorImpl::getMemberState() const {
- if (_selfIndex == -1) {
- if (_rsConfig.isInitialized()) {
- return MemberState::RS_REMOVED;
- }
- return MemberState::RS_STARTUP;
- }
-
- if (_rsConfig.isConfigServer()) {
- if (_options.clusterRole != ClusterRole::ConfigServer) {
- return MemberState::RS_REMOVED;
- } else {
- invariant(_storageEngineSupportsReadCommitted != ReadCommittedSupport::kUnknown);
- if (_storageEngineSupportsReadCommitted == ReadCommittedSupport::kNo) {
- return MemberState::RS_REMOVED;
- }
- }
- } else {
- if (_options.clusterRole == ClusterRole::ConfigServer) {
- return MemberState::RS_REMOVED;
- }
- }
-
- if (_role == Role::leader) {
- invariant(_currentPrimaryIndex == _selfIndex);
- invariant(_leaderMode != LeaderMode::kNotLeader);
- return MemberState::RS_PRIMARY;
- }
- const MemberConfig& myConfig = _selfConfig();
- if (myConfig.isArbiter()) {
- return MemberState::RS_ARBITER;
- }
- if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_memberData, _selfIndex))) &&
- (_followerMode == MemberState::RS_SECONDARY)) {
- return MemberState::RS_RECOVERING;
- }
- return _followerMode;
-}
-
-bool TopologyCoordinatorImpl::canAcceptWrites() const {
- return _leaderMode == LeaderMode::kMaster;
-}
-
-void TopologyCoordinatorImpl::setElectionInfo(OID electionId, Timestamp electionOpTime) {
- invariant(_role == Role::leader);
- _electionTime = electionOpTime;
- _electionId = electionId;
-}
-
-void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp electionOpTime) {
- invariant(_role == Role::candidate);
- invariant(_leaderMode == LeaderMode::kNotLeader);
- _role = Role::leader;
- _setLeaderMode(LeaderMode::kLeaderElect);
- setElectionInfo(electionId, electionOpTime);
- _currentPrimaryIndex = _selfIndex;
- _syncSource = HostAndPort();
- _forceSyncSourceIndex = -1;
- // Prevent last committed optime from updating until we finish draining.
- _firstOpTimeOfMyTerm =
- OpTime(Timestamp(std::numeric_limits<int>::max(), 0), std::numeric_limits<int>::max());
-}
-
-void TopologyCoordinatorImpl::processLoseElection() {
- invariant(_role == Role::candidate);
- invariant(_leaderMode == LeaderMode::kNotLeader);
- const HostAndPort syncSourceAddress = getSyncSourceAddress();
- _electionTime = Timestamp(0, 0);
- _electionId = OID();
- _role = Role::follower;
-
- // Clear voteLease time, if we voted for ourselves in this election.
- // This will allow us to vote for others.
- if (_voteLease.whoId == _selfConfig().getId()) {
- _voteLease.when = Date_t();
- }
-}
-
-bool TopologyCoordinatorImpl::attemptStepDown(
- long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) {
-
- if (_role != Role::leader || _leaderMode == LeaderMode::kSteppingDown || _term != termAtStart) {
- uasserted(ErrorCodes::PrimarySteppedDown,
- "While waiting for secondaries to catch up before stepping down, "
- "this node decided to step down for other reasons");
- }
- invariant(_leaderMode == LeaderMode::kAttemptingStepDown);
-
- if (now >= stepDownUntil) {
- uasserted(ErrorCodes::ExceededTimeLimit,
- "By the time we were ready to step down, we were already past the "
- "time we were supposed to step down until");
- }
-
- if (!_canCompleteStepDownAttempt(now, waitUntil, force)) {
- // Stepdown attempt failed.
-
- // Check waitUntil after at least one stepdown attempt, so that stepdown could succeed even
- // if secondaryCatchUpPeriodSecs == 0.
- if (now >= waitUntil) {
- uasserted(ErrorCodes::ExceededTimeLimit,
- str::stream() << "No electable secondaries caught up as of "
- << dateToISOStringLocal(now)
- << "Please use the replSetStepDown command with the argument "
- << "{force: true} to force node to step down.");
- }
-
- // Stepdown attempt failed, but in a way that can be retried
- return false;
- }
-
- // Stepdown attempt success!
- _stepDownUntil = stepDownUntil;
- _stepDownSelfAndReplaceWith(-1);
- return true;
-}
-
-bool TopologyCoordinatorImpl::_canCompleteStepDownAttempt(Date_t now,
- Date_t waitUntil,
- bool force) {
- const bool forceNow = force && (now >= waitUntil);
- if (forceNow) {
- return true;
- }
-
- return isSafeToStepDown();
-}
-
-bool TopologyCoordinatorImpl::isSafeToStepDown() {
- if (!_rsConfig.isInitialized() || _selfIndex < 0) {
- return false;
- }
-
- OpTime lastApplied = getMyLastAppliedOpTime();
-
- auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName);
- invariant(tagStatus.isOK());
-
- // Check if a majority of nodes have reached the last applied optime.
- if (!haveTaggedNodesReachedOpTime(lastApplied, tagStatus.getValue(), false)) {
- return false;
- }
-
- // Now check that we also have at least one caught up node that is electable.
- const OpTime lastOpApplied = getMyLastAppliedOpTime();
- for (int memberIndex = 0; memberIndex < _rsConfig.getNumMembers(); memberIndex++) {
- // ignore your self
- if (memberIndex == _selfIndex) {
- continue;
- }
- UnelectableReasonMask reason = _getUnelectableReason(memberIndex);
- if (!reason && _memberData.at(memberIndex).getHeartbeatAppliedOpTime() >= lastOpApplied) {
- // Found a caught up and electable node, succeed with step down.
- return true;
- }
- }
-
- return false;
-}
-
-void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) {
- invariant(_role == Role::follower);
- switch (newMode) {
- case MemberState::RS_RECOVERING:
- case MemberState::RS_ROLLBACK:
- case MemberState::RS_SECONDARY:
- case MemberState::RS_STARTUP2:
- _followerMode = newMode;
- break;
- default:
- invariant(false);
- }
-
- if (_followerMode != MemberState::RS_SECONDARY) {
- return;
- }
-
- // When a single node replica set transitions to SECONDARY, we must check if we should
- // be a candidate here. This is necessary because a single node replica set has no
- // heartbeats that would normally change the role to candidate.
-
- if (_isElectableNodeInSingleNodeReplicaSet()) {
- _role = Role::candidate;
- }
-}
-
-bool TopologyCoordinatorImpl::_isElectableNodeInSingleNodeReplicaSet() const {
- return _followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 &&
- _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable() &&
- _maintenanceModeCalls == 0;
-}
-
-void TopologyCoordinatorImpl::finishUnconditionalStepDown() {
- invariant(_leaderMode == LeaderMode::kSteppingDown);
-
- int remotePrimaryIndex = -1;
- for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
- ++it) {
- const int itIndex = indexOfIterator(_memberData, it);
- if (itIndex == _selfIndex) {
- continue;
- }
-
- if (it->getState().primary() && it->up()) {
- if (remotePrimaryIndex != -1) {
- // two other nodes think they are primary (asynchronously polled)
- // -- wait for things to settle down.
- remotePrimaryIndex = -1;
- warning() << "two remote primaries (transiently)";
- break;
- }
- remotePrimaryIndex = itIndex;
- }
- }
- _stepDownSelfAndReplaceWith(remotePrimaryIndex);
-}
-
-void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) {
- invariant(_role == Role::leader);
- invariant(_selfIndex != -1);
- invariant(_selfIndex != newPrimary);
- invariant(_selfIndex == _currentPrimaryIndex);
- _currentPrimaryIndex = newPrimary;
- _role = Role::follower;
- _setLeaderMode(LeaderMode::kNotLeader);
-}
-
-bool TopologyCoordinatorImpl::updateLastCommittedOpTime() {
- // If we're not primary or we're stepping down due to learning of a new term then we must not
- // advance the commit point. If we are stepping down due to a user request, however, then it
- // is safe to advance the commit point, and in fact we must since the stepdown request may be
- // waiting for the commit point to advance enough to be able to safely complete the step down.
- if (!_iAmPrimary() || _leaderMode == LeaderMode::kSteppingDown) {
- return false;
- }
-
- // Whether we use the applied or durable OpTime for the commit point is decided here.
- const bool useDurableOpTime = _rsConfig.getWriteConcernMajorityShouldJournal();
-
- std::vector<OpTime> votingNodesOpTimes;
- for (const auto& memberData : _memberData) {
- int memberIndex = memberData.getConfigIndex();
- invariant(memberIndex >= 0);
- const auto& memberConfig = _rsConfig.getMemberAt(memberIndex);
- if (memberConfig.isVoter()) {
- const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime()
- : memberData.getLastAppliedOpTime();
- votingNodesOpTimes.push_back(opTime);
- }
- }
-
- invariant(votingNodesOpTimes.size() > 0);
- if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) {
- return false;
- }
- std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
-
- // need the majority to have this OpTime
- OpTime committedOpTime =
- votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
- return advanceLastCommittedOpTime(committedOpTime);
-}
-
-bool TopologyCoordinatorImpl::advanceLastCommittedOpTime(const OpTime& committedOpTime) {
- if (committedOpTime == _lastCommittedOpTime) {
- return false; // Hasn't changed, so ignore it.
- } else if (committedOpTime < _lastCommittedOpTime) {
- LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime
- << ", currentCommittedOpTime: " << _lastCommittedOpTime;
- return false; // This may have come from an out-of-order heartbeat. Ignore it.
- }
-
- // This check is performed to ensure primaries do not commit an OpTime from a previous term.
- if (_iAmPrimary() && committedOpTime < _firstOpTimeOfMyTerm) {
- LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: "
- << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm;
- return false;
- }
-
- LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime;
- _lastCommittedOpTime = committedOpTime;
- return true;
-}
-
-OpTime TopologyCoordinatorImpl::getLastCommittedOpTime() const {
- return _lastCommittedOpTime;
-}
-
-bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary(
- long long termWhenDrainCompleted) const {
-
- if (termWhenDrainCompleted != _term) {
- return false;
- }
- // Allow completing the transition to primary even when in the middle of a stepdown attempt,
- // in case the stepdown attempt fails.
- if (_leaderMode != LeaderMode::kLeaderElect && _leaderMode != LeaderMode::kAttemptingStepDown) {
- return false;
- }
-
- return true;
-}
-
-Status TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) {
- if (!canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())) {
- return Status(ErrorCodes::PrimarySteppedDown,
- "By the time this node was ready to complete its transition to PRIMARY it "
- "was no longer eligible to do so");
- }
- if (_leaderMode == LeaderMode::kLeaderElect) {
- _setLeaderMode(LeaderMode::kMaster);
- }
- _firstOpTimeOfMyTerm = firstOpTimeOfTerm;
- return Status::OK();
-}
-
-void TopologyCoordinatorImpl::adjustMaintenanceCountBy(int inc) {
- invariant(_role == Role::follower);
- _maintenanceModeCalls += inc;
- invariant(_maintenanceModeCalls >= 0);
-}
-
-int TopologyCoordinatorImpl::getMaintenanceCount() const {
- return _maintenanceModeCalls;
-}
-
-TopologyCoordinator::UpdateTermResult TopologyCoordinatorImpl::updateTerm(long long term,
- Date_t now) {
- if (term <= _term) {
- return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate;
- }
- // Don't run election if we just stood up or learned about a new term.
- _electionSleepUntil = now + _rsConfig.getElectionTimeoutPeriod();
-
- // Don't update the term just yet if we are going to step down, as we don't want to report
- // that we are primary in the new term.
- if (_iAmPrimary()) {
- return TopologyCoordinator::UpdateTermResult::kTriggerStepDown;
- }
- LOG(1) << "Updating term from " << _term << " to " << term;
- _term = term;
- return TopologyCoordinator::UpdateTermResult::kUpdatedTerm;
-}
-
-
-long long TopologyCoordinatorImpl::getTerm() {
- return _term;
-}
-
-// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the
-// replset. Passing metadata is unnecessary.
-bool TopologyCoordinatorImpl::shouldChangeSyncSource(
- const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- boost::optional<rpc::OplogQueryMetadata> oqMetadata,
- Date_t now) const {
- // Methodology:
- // If there exists a viable sync source member other than currentSource, whose oplog has
- // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's,
- // return true.
- // If the currentSource has the same replication progress as we do and has no source for further
- // progress, return true.
-
- if (_selfIndex == -1) {
- log() << "Not choosing new sync source because we are not in the config.";
- return false;
- }
-
- // If the user requested a sync source change, return true.
- if (_forceSyncSourceIndex != -1) {
- log() << "Choosing new sync source because the user has requested to use "
- << _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort()
- << " as a sync source";
- return true;
- }
-
- if (_rsConfig.getProtocolVersion() == 1 &&
- replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
- log() << "Choosing new sync source because the config version supplied by " << currentSource
- << ", " << replMetadata.getConfigVersion() << ", does not match ours, "
- << _rsConfig.getConfigVersion();
- return true;
- }
-
- const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource);
- // PV0 doesn't use metadata, we have to consult _rsConfig.
- if (currentSourceIndex == -1) {
- log() << "Choosing new sync source because " << currentSource.toString()
- << " is not in our config";
- return true;
- }
-
- invariant(currentSourceIndex != _selfIndex);
-
- // If OplogQueryMetadata was provided, use its values, otherwise use the ones in
- // ReplSetMetadata.
- OpTime currentSourceOpTime;
- int syncSourceIndex = -1;
- int primaryIndex = -1;
- if (oqMetadata) {
- currentSourceOpTime =
- std::max(oqMetadata->getLastOpApplied(),
- _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
- syncSourceIndex = oqMetadata->getSyncSourceIndex();
- primaryIndex = oqMetadata->getPrimaryIndex();
- } else {
- currentSourceOpTime =
- std::max(replMetadata.getLastOpVisible(),
- _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
- syncSourceIndex = replMetadata.getSyncSourceIndex();
- primaryIndex = replMetadata.getPrimaryIndex();
- }
-
- if (currentSourceOpTime.isNull()) {
- // Haven't received a heartbeat from the sync source yet, so can't tell if we should
- // change.
- return false;
- }
-
- // Change sync source if they are not ahead of us, and don't have a sync source,
- // unless they are primary.
- const OpTime myLastOpTime = getMyLastAppliedOpTime();
- if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 &&
- currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) {
- std::stringstream logMessage;
- logMessage << "Choosing new sync source because our current sync source, "
- << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime
- << ") which is not ahead of ours (" << myLastOpTime
- << "), it does not have a sync source, and it's not the primary";
- if (primaryIndex >= 0) {
- logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)";
- } else {
- logMessage << " (sync source does not know the primary)";
- }
- log() << logMessage.str();
- return true;
- }
-
- if (MONGO_FAIL_POINT(disableMaxSyncSourceLagSecs)) {
- log() << "disableMaxSyncSourceLagSecs fail point enabled - not checking the most recent "
- "OpTime, "
- << currentSourceOpTime.toString() << ", of our current sync source, " << currentSource
- << ", against the OpTimes of the other nodes in this replica set.";
- } else {
- unsigned int currentSecs = currentSourceOpTime.getSecs();
- unsigned int goalSecs = currentSecs + durationCount<Seconds>(_options.maxSyncSourceLagSecs);
-
- for (std::vector<MemberData>::const_iterator it = _memberData.begin();
- it != _memberData.end();
- ++it) {
- const int itIndex = indexOfIterator(_memberData, it);
- const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex);
- if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) &&
- (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) &&
- it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) &&
- goalSecs < it->getHeartbeatAppliedOpTime().getSecs()) {
- log() << "Choosing new sync source because the most recent OpTime of our sync "
- "source, "
- << currentSource << ", is " << currentSourceOpTime.toString()
- << " which is more than " << _options.maxSyncSourceLagSecs
- << " behind member " << candidateConfig.getHostAndPort().toString()
- << " whose most recent OpTime is "
- << it->getHeartbeatAppliedOpTime().toString();
- invariant(itIndex != _selfIndex);
- return true;
- }
- }
- }
-
- return false;
-}
-
-rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata(
- const OpTime& lastVisibleOpTime) const {
- return rpc::ReplSetMetadata(_term,
- _lastCommittedOpTime,
- lastVisibleOpTime,
- _rsConfig.getConfigVersion(),
- _rsConfig.getReplicaSetId(),
- _currentPrimaryIndex,
- _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
-}
-
-rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata(int rbid) const {
- return rpc::OplogQueryMetadata(_lastCommittedOpTime,
- getMyLastAppliedOpTime(),
- rbid,
- _currentPrimaryIndex,
- _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
-}
-
-void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
- output->setConfig(_rsConfig);
- output->setHBData(_memberData);
- output->setSelfIndex(_selfIndex);
- output->setPrimaryIndex(_currentPrimaryIndex);
- output->setSelfState(getMemberState());
- output->setSelfHeartbeatMessage(_hbmsg);
-}
-
-void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response) {
- response->setTerm(_term);
-
- if (args.getTerm() < _term) {
- response->setVoteGranted(false);
- response->setReason(str::stream() << "candidate's term (" << args.getTerm()
- << ") is lower than mine ("
- << _term
- << ")");
- } else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) {
- response->setVoteGranted(false);
- response->setReason(str::stream() << "candidate's config version ("
- << args.getConfigVersion()
- << ") differs from mine ("
- << _rsConfig.getConfigVersion()
- << ")");
- } else if (args.getSetName() != _rsConfig.getReplSetName()) {
- response->setVoteGranted(false);
- response->setReason(str::stream() << "candidate's set name (" << args.getSetName()
- << ") differs from mine ("
- << _rsConfig.getReplSetName()
- << ")");
- } else if (args.getLastDurableOpTime() < getMyLastAppliedOpTime()) {
- response->setVoteGranted(false);
- response
- ->setReason(str::stream()
- << "candidate's data is staler than mine. candidate's last applied OpTime: "
- << args.getLastDurableOpTime().toString()
- << ", my last applied OpTime: "
- << getMyLastAppliedOpTime().toString());
- } else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) {
- response->setVoteGranted(false);
- response->setReason(str::stream()
- << "already voted for another candidate ("
- << _rsConfig.getMemberAt(_lastVote.getCandidateIndex()).getHostAndPort()
- << ") this term ("
- << _lastVote.getTerm()
- << ")");
- } else {
- int betterPrimary = _findHealthyPrimaryOfEqualOrGreaterPriority(args.getCandidateIndex());
- if (_selfConfig().isArbiter() && betterPrimary >= 0) {
- response->setVoteGranted(false);
- response->setReason(str::stream()
- << "can see a healthy primary ("
- << _rsConfig.getMemberAt(betterPrimary).getHostAndPort()
- << ") of equal or greater priority");
- } else {
- if (!args.isADryRun()) {
- _lastVote.setTerm(args.getTerm());
- _lastVote.setCandidateIndex(args.getCandidateIndex());
- }
- response->setVoteGranted(true);
- }
- }
-}
-
-void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) {
- _lastVote = lastVote;
-}
-
-void TopologyCoordinatorImpl::voteForMyselfV1() {
- _lastVote.setTerm(_term);
- _lastVote.setCandidateIndex(_selfIndex);
-}
-
-void TopologyCoordinatorImpl::setPrimaryIndex(long long primaryIndex) {
- _currentPrimaryIndex = primaryIndex;
-}
-
-Status TopologyCoordinatorImpl::becomeCandidateIfElectable(const Date_t now,
- StartElectionReason reason) {
- if (_role == Role::leader) {
- return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"};
- }
-
- if (_role == Role::candidate) {
- return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"};
- }
-
- const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, reason);
- if (unelectableReason) {
- return {ErrorCodes::NodeNotElectable,
- str::stream() << "Not standing for election because "
- << _getUnelectableReasonString(unelectableReason)};
- }
-
- // All checks passed, become a candidate and start election proceedings.
- _role = Role::candidate;
-
- return Status::OK();
-}
-
-void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool supported) {
- _storageEngineSupportsReadCommitted =
- supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo;
-}
-
-void TopologyCoordinatorImpl::restartHeartbeats() {
- for (auto& hb : _memberData) {
- hb.restart();
- }
-}
-
-boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const {
- // The smallest OpTime in PV1.
- OpTime latest(Timestamp(0, 0), 0);
- for (size_t i = 0; i < _memberData.size(); i++) {
- auto& peer = _memberData[i];
-
- if (static_cast<int>(i) == _selfIndex) {
- continue;
- }
- // If any heartbeat is not fresh enough, return none.
- if (!peer.isUpdatedSinceRestart()) {
- return boost::none;
- }
- // Ignore down members
- if (!peer.up()) {
- continue;
- }
- if (peer.getHeartbeatAppliedOpTime() > latest) {
- latest = peer.getHeartbeatAppliedOpTime();
- }
- }
- return latest;
-}
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
deleted file mode 100644
index 046ff2217a6..00000000000
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ /dev/null
@@ -1,529 +0,0 @@
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <string>
-#include <vector>
-
-#include "mongo/bson/timestamp.h"
-#include "mongo/db/repl/last_vote.h"
-#include "mongo/db/repl/member_data.h"
-#include "mongo/db/repl/member_state.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/repl_set_config.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/topology_coordinator.h"
-#include "mongo/db/server_options.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-class OperationContext;
-
-namespace rpc {
-class ReplSetMetadata;
-} // namespace rpc
-
-namespace repl {
-
-static const Milliseconds UninitializedPing{-1};
-
-/**
- * Represents a latency measurement for each replica set member based on heartbeat requests.
- * The measurement is an average weighted 80% to the old value, and 20% to the new value.
- *
- * Also stores information about heartbeat progress and retries.
- */
-class PingStats {
-public:
- /**
- * Records that a new heartbeat request started at "now".
- *
- * This resets the failure count used in determining whether the next request to a target
- * should be a retry or a regularly scheduled heartbeat message.
- */
- void start(Date_t now);
-
- /**
- * Records that a heartbeat request completed successfully, and that "millis" milliseconds
- * were spent for a single network roundtrip plus remote processing time.
- */
- void hit(Milliseconds millis);
-
- /**
- * Records that a heartbeat request failed.
- */
- void miss();
-
- /**
- * Gets the number of hit() calls.
- */
- unsigned int getCount() const {
- return count;
- }
-
- /**
- * Gets the weighted average round trip time for heartbeat messages to the target.
- * Returns 0 if there have been no pings recorded yet.
- */
- Milliseconds getMillis() const {
- return value == UninitializedPing ? Milliseconds(0) : value;
- }
-
- /**
- * Gets the date at which start() was last called, which is used to determine if
- * a heartbeat should be retried or if the time limit has expired.
- */
- Date_t getLastHeartbeatStartDate() const {
- return _lastHeartbeatStartDate;
- }
-
- /**
- * Gets the number of failures since start() was last called.
- *
- * This value is incremented by calls to miss(), cleared by calls to start() and
- * set to the maximum possible value by calls to hit().
- */
- int getNumFailuresSinceLastStart() const {
- return _numFailuresSinceLastStart;
- }
-
-private:
- unsigned int count = 0;
- Milliseconds value = UninitializedPing;
- Date_t _lastHeartbeatStartDate;
- int _numFailuresSinceLastStart = std::numeric_limits<int>::max();
-};
-
-class TopologyCoordinatorImpl : public TopologyCoordinator {
-public:
- struct Options {
- // A sync source is re-evaluated after it lags behind further than this amount.
- Seconds maxSyncSourceLagSecs{0};
-
- // Whether or not this node is running as a config server.
- ClusterRole clusterRole{ClusterRole::None};
- };
-
- /**
- * Constructs a Topology Coordinator object.
- **/
- TopologyCoordinatorImpl(Options options);
-
- ////////////////////////////////////////////////////////////
- //
- // Implementation of TopologyCoordinator interface
- //
- ////////////////////////////////////////////////////////////
-
- virtual Role getRole() const;
- virtual MemberState getMemberState() const;
- virtual bool canAcceptWrites() const override;
- virtual bool isSteppingDown() const override;
- virtual HostAndPort getSyncSourceAddress() const;
- virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const;
- virtual int getMaintenanceCount() const;
- virtual long long getTerm();
- virtual UpdateTermResult updateTerm(long long term, Date_t now);
- virtual void setForceSyncSourceIndex(int index);
- virtual HostAndPort chooseNewSyncSource(Date_t now,
- const OpTime& lastOpTimeFetched,
- ChainingPreference chainingPreference) override;
- virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
- virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now);
- virtual void clearSyncSourceBlacklist();
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- boost::optional<rpc::OplogQueryMetadata> oqMetadata,
- Date_t now) const;
- virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
- virtual void setElectionSleepUntil(Date_t newTime);
- virtual void setFollowerMode(MemberState::MS newMode);
- virtual bool updateLastCommittedOpTime();
- virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime);
- virtual OpTime getLastCommittedOpTime() const;
- virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override;
- virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) override;
- virtual void adjustMaintenanceCountBy(int inc);
- virtual void prepareSyncFromResponse(const HostAndPort& target,
- BSONObjBuilder* response,
- Status* result);
- virtual void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args,
- Date_t now,
- BSONObjBuilder* response,
- Status* result);
- virtual void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args,
- Date_t now,
- BSONObjBuilder* response,
- Status* result);
- virtual Status prepareHeartbeatResponse(Date_t now,
- const ReplSetHeartbeatArgs& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response);
- virtual Status prepareHeartbeatResponseV1(Date_t now,
- const ReplSetHeartbeatArgsV1& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response);
- virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
- BSONObjBuilder* response,
- Status* result);
- virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand(
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle,
- OpTime currentCommittedSnapshotOpTime) const;
-
- virtual void fillIsMasterForReplSet(IsMasterResponse* response);
- virtual void fillMemberData(BSONObjBuilder* result);
- virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now,
- int secs,
- BSONObjBuilder* response);
- virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now);
- virtual std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest(
- Date_t now, const std::string& ourSetName, const HostAndPort& target);
- virtual std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1(
- Date_t now, const std::string& ourSetName, const HostAndPort& target);
- virtual HeartbeatResponseAction processHeartbeatResponse(
- Date_t now,
- Milliseconds networkRoundTripTime,
- const HostAndPort& target,
- const StatusWith<ReplSetHeartbeatResponse>& hbResponse);
- virtual bool voteForMyself(Date_t now);
- virtual void setElectionInfo(OID electionId, Timestamp electionOpTime);
- virtual void processWinElection(OID electionId, Timestamp electionOpTime);
- virtual void processLoseElection();
- virtual Status checkShouldStandForElection(Date_t now) const;
- virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message);
- virtual bool attemptStepDown(long long termAtStart,
- Date_t now,
- Date_t waitUntil,
- Date_t stepDownUntil,
- bool force) override;
- virtual bool isSafeToStepDown() override;
- virtual bool prepareForUnconditionalStepDown() override;
- virtual Status prepareForStepDownAttempt() override;
- virtual void abortAttemptedStepDownIfNeeded() override;
- virtual void finishUnconditionalStepDown() override;
- virtual Date_t getStepDownTime() const;
- virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const;
- virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const;
- virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response);
- virtual void summarizeAsHtml(ReplSetHtmlSummary* output);
- virtual void loadLastVote(const LastVote& lastVote);
- virtual void voteForMyselfV1();
- virtual void setPrimaryIndex(long long primaryIndex);
- virtual int getCurrentPrimaryIndex() const;
- virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten);
- virtual bool haveTaggedNodesReachedOpTime(const OpTime& opTime,
- const ReplSetTagPattern& tagPattern,
- bool durablyWritten);
- virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op,
- bool durablyWritten,
- bool skipSelf);
- virtual bool setMemberAsDown(Date_t now, const int memberIndex) override;
- virtual std::pair<int, Date_t> getStalestLiveMember() const;
- virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now);
- virtual void resetAllMemberTimeouts(Date_t now);
- virtual void resetMemberTimeouts(Date_t now,
- const stdx::unordered_set<HostAndPort>& member_set);
- virtual OpTime getMyLastAppliedOpTime() const;
- virtual OpTime getMyLastDurableOpTime() const;
- virtual MemberData* getMyMemberData();
- virtual MemberData* findMemberDataByMemberId(const int memberId);
- virtual MemberData* findMemberDataByRid(const OID rid);
- virtual MemberData* addSlaveMemberData(const OID rid);
- virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason);
- virtual void setStorageEngineSupportsReadCommitted(bool supported);
-
- virtual void restartHeartbeats();
-
- virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const;
-
- ////////////////////////////////////////////////////////////
- //
- // Test support methods
- //
- ////////////////////////////////////////////////////////////
-
- // Changes _memberState to newMemberState. Only for testing.
- void changeMemberState_forTest(const MemberState& newMemberState,
- const Timestamp& electionTime = Timestamp(0, 0));
-
- // Sets "_electionTime" to "newElectionTime". Only for testing.
- void _setElectionTime(const Timestamp& newElectionTime);
-
- // Sets _currentPrimaryIndex to the given index. Should only be used in unit tests!
- // TODO(spencer): Remove this once we can easily call for an election in unit tests to
- // set the current primary.
- void _setCurrentPrimaryForTest(int primaryIndex);
-
- // Returns _electionTime. Only used in unittests.
- Timestamp getElectionTime() const;
-
- // Returns _electionId. Only used in unittests.
- OID getElectionId() const;
-
-private:
- enum UnelectableReason {
- None = 0,
- CannotSeeMajority = 1 << 0,
- NotCloseEnoughToLatestOptime = 1 << 1,
- ArbiterIAm = 1 << 2,
- NotSecondary = 1 << 3,
- NoPriority = 1 << 4,
- StepDownPeriodActive = 1 << 5,
- NoData = 1 << 6,
- NotInitialized = 1 << 7,
- VotedTooRecently = 1 << 8,
- RefusesToStand = 1 << 9,
- NotCloseEnoughToLatestForPriorityTakeover = 1 << 10,
- NotFreshEnoughForCatchupTakeover = 1 << 11,
- };
- typedef int UnelectableReasonMask;
-
-
- // Set what type of PRIMARY this node currently is.
- void _setLeaderMode(LeaderMode mode);
-
- // Returns the number of heartbeat pings which have occurred.
- int _getTotalPings();
-
- // Returns the current "ping" value for the given member by their address
- Milliseconds _getPing(const HostAndPort& host);
-
- // Determines if we will veto the member specified by "args.id".
- // If we veto, the errmsg will be filled in with a reason
- bool _shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args,
- const Date_t& now,
- std::string* errmsg) const;
-
- // Returns the index of the member with the matching id, or -1 if none match.
- int _getMemberIndex(int id) const;
-
- // Sees if a majority number of votes are held by members who are currently "up"
- bool _aMajoritySeemsToBeUp() const;
-
- // Checks if the node can see a healthy primary of equal or greater priority to the
- // candidate. If so, returns the index of that node. Otherwise returns -1.
- int _findHealthyPrimaryOfEqualOrGreaterPriority(const int candidateIndex) const;
-
- // Is otherOpTime close enough (within 10 seconds) to the latest known optime to qualify
- // for an election
- bool _isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const;
-
- // Is our optime close enough to the latest known optime to call for a priority takeover.
- bool _amIFreshEnoughForPriorityTakeover() const;
-
- // Is the primary node still in catchup mode and is our optime the latest
- // known optime of all the up nodes.
- bool _amIFreshEnoughForCatchupTakeover() const;
-
- // Returns reason why "self" member is unelectable
- UnelectableReasonMask _getMyUnelectableReason(const Date_t now,
- StartElectionReason reason) const;
-
- // Returns reason why memberIndex is unelectable
- UnelectableReasonMask _getUnelectableReason(int memberIndex) const;
-
- // Returns the nice text of why the node is unelectable
- std::string _getUnelectableReasonString(UnelectableReasonMask ur) const;
-
- // Return true if we are currently primary
- bool _iAmPrimary() const;
-
- // Scans through all members that are 'up' and return the latest known optime.
- OpTime _latestKnownOpTime() const;
-
- // Scans the electable set and returns the highest priority member index
- int _getHighestPriorityElectableIndex(Date_t now) const;
-
- // Returns true if "one" member is higher priority than "two" member
- bool _isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const;
-
- // Helper shortcut to self config
- const MemberConfig& _selfConfig() const;
-
- // Helper shortcut to self member data
- const MemberData& _selfMemberData() const;
-
- // Index of self member in member data.
- const int _selfMemberDataIndex() const;
-
- // Returns NULL if there is no primary, or the MemberConfig* for the current primary
- const MemberConfig* _currentPrimaryMember() const;
-
- /**
- * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an
- * election or stepdown should commence.
- * _updatePrimaryFromHBDataV1() is a simplified version of _updatePrimaryFromHBData() to be used
- * when in ProtocolVersion1.
- */
- HeartbeatResponseAction _updatePrimaryFromHBData(int updatedConfigIndex,
- const MemberState& originalState,
- Date_t now);
- HeartbeatResponseAction _updatePrimaryFromHBDataV1(int updatedConfigIndex,
- const MemberState& originalState,
- Date_t now);
-
- /**
- * Updates _memberData based on the newConfig, ensuring that every member in the newConfig
- * has an entry in _memberData. If any nodes in the newConfig are also present in
- * _currentConfig, copies their heartbeat info into the corresponding entry in the updated
- * _memberData vector.
- */
- void _updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now);
-
- /**
- * Returns whether a stepdown attempt should be allowed to proceed. See the comment for
- * attemptStepDown() for more details on the rules of when stepdown attempts succeed or fail.
- */
- bool _canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force);
-
- void _stepDownSelfAndReplaceWith(int newPrimary);
-
- /**
- * Looks up the provided member in the blacklist and returns true if the member's blacklist
- * expire time is after 'now'. If the member is found but the expire time is before 'now',
- * the function returns false. If the member is not found in the blacklist, the function
- * returns false.
- **/
- bool _memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const;
-
- /**
- * Returns true if we are a one-node replica set, we're the one member,
- * we're electable, we're not in maintenance mode, and we are currently in followerMode
- * SECONDARY.
- *
- * This is used to decide if we should transition to Role::candidate in a one-node replica set.
- */
- bool _isElectableNodeInSingleNodeReplicaSet() const;
-
- // This node's role in the replication protocol.
- Role _role;
-
- // This is a unique id that is generated and set each time we transition to PRIMARY, as the
- // result of an election.
- OID _electionId;
- // The time at which the current PRIMARY was elected.
- Timestamp _electionTime;
-
- // This node's election term. The term is used as part of the consensus algorithm to elect
- // and maintain one primary (leader) node in the cluster.
- long long _term;
-
- // the index of the member we currently believe is primary, if one exists, otherwise -1
- int _currentPrimaryIndex;
-
- // the hostandport we are currently syncing from
- // empty if no sync source (we are primary, or we cannot connect to anyone yet)
- HostAndPort _syncSource;
- // These members are not chosen as sync sources for a period of time, due to connection
- // issues with them
- std::map<HostAndPort, Date_t> _syncSourceBlacklist;
- // The next sync source to be chosen, requested via a replSetSyncFrom command
- int _forceSyncSourceIndex;
-
- // Options for this TopologyCoordinator
- Options _options;
-
- // "heartbeat message"
- // sent in requestHeartbeat respond in field "hbm"
- std::string _hbmsg;
- Date_t _hbmsgTime; // when it was logged
-
- // heartbeat msg to send to others; descriptive diagnostic info
- std::string _getHbmsg(Date_t now) const;
-
- int _selfIndex; // this node's index in _members and _currentConfig
-
- ReplSetConfig _rsConfig; // The current config, including a vector of MemberConfigs
-
- // Heartbeat, current applied/durable optime, and other state data for each member. It is
- // guaranteed that this vector will be maintained in the same order as the MemberConfigs in
- // _currentConfig, therefore the member config index can be used to index into this vector as
- // well.
- std::vector<MemberData> _memberData;
-
- // Time when stepDown command expires
- Date_t _stepDownUntil;
-
- // A time before which this node will not stand for election.
- // In protocol version 1, this is used to prevent running for election after seeing
- // a new term.
- Date_t _electionSleepUntil;
-
- // OpTime of the latest committed operation.
- OpTime _lastCommittedOpTime;
-
- // OpTime representing our transition to PRIMARY and the start of our term.
- // _lastCommittedOpTime cannot be set to an earlier OpTime.
- OpTime _firstOpTimeOfMyTerm;
-
- // The number of calls we have had to enter maintenance mode
- int _maintenanceModeCalls;
-
- // The sub-mode of follower that we are in. Legal values are RS_SECONDARY, RS_RECOVERING,
- // RS_STARTUP2 (initial sync) and RS_ROLLBACK. Only meaningful if _role == Role::follower.
- // Configured via setFollowerMode(). If the sub-mode is RS_SECONDARY, then the effective
- // sub-mode is either RS_SECONDARY or RS_RECOVERING, depending on _maintenanceModeCalls.
- // Rather than accesing this variable direclty, one should use the getMemberState() method,
- // which computes the replica set node state on the fly.
- MemberState::MS _followerMode;
-
- // What type of PRIMARY this node currently is. Don't set this directly, call _setLeaderMode
- // instead.
- LeaderMode _leaderMode = LeaderMode::kNotLeader;
-
- typedef std::map<HostAndPort, PingStats> PingMap;
- // Ping stats for each member by HostAndPort;
- PingMap _pings;
-
- // Last vote info from the election
- struct VoteLease {
- static const Seconds leaseTime;
-
- Date_t when;
- int whoId = -1;
- HostAndPort whoHostAndPort;
- } _voteLease;
-
- // V1 last vote info for elections
- LastVote _lastVote{OpTime::kInitialTerm, -1};
-
- enum class ReadCommittedSupport {
- kUnknown,
- kNo,
- kYes,
- };
-
- // Whether or not the storage engine supports read committed.
- ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown};
-};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_test.cpp
index a22aec9f72f..a22aec9f72f 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_test.cpp
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index 048bb17b573..048bb17b573 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp