summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/topology_coordinator_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/topology_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp3770
1 files changed, 1822 insertions, 1948 deletions
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 08d7ac7198c..a2905c1eacb 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -55,1308 +55,1232 @@
namespace mongo {
namespace repl {
- using std::vector;
+using std::vector;
- const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30);
+const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30);
namespace {
- template <typename T>
- int indexOfIterator(const std::vector<T>& vec,
- typename std::vector<T>::const_iterator& it) {
- return static_cast<int>(it - vec.begin());
- }
-
- // Interval between the time the last heartbeat from a node was received successfully, or
- // the time when we gave up retrying, and when the next heartbeat should be sent to a target.
- const auto kHeartbeatInterval = Seconds{2};
+template <typename T>
+int indexOfIterator(const std::vector<T>& vec, typename std::vector<T>::const_iterator& it) {
+ return static_cast<int>(it - vec.begin());
+}
- // Maximum number of retries for a failed heartbeat.
- const int kMaxHeartbeatRetries = 2;
+// Interval between the time the last heartbeat from a node was received successfully, or
+// the time when we gave up retrying, and when the next heartbeat should be sent to a target.
+const auto kHeartbeatInterval = Seconds{2};
- /**
- * Returns true if the only up heartbeats are auth errors.
- */
- bool _hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberHeartbeatData>& hbdata,
- const int selfIndex) {
- bool foundAuthError = false;
- for (std::vector<MemberHeartbeatData>::const_iterator it = hbdata.begin();
- it != hbdata.end();
- ++it) {
- if (indexOfIterator(hbdata, it) == selfIndex) {
- continue;
- }
+// Maximum number of retries for a failed heartbeat.
+const int kMaxHeartbeatRetries = 2;
- if (it->up()) {
- return false;
- }
+/**
+ * Returns true if the only up heartbeats are auth errors.
+ */
+bool _hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberHeartbeatData>& hbdata,
+ const int selfIndex) {
+ bool foundAuthError = false;
+ for (std::vector<MemberHeartbeatData>::const_iterator it = hbdata.begin(); it != hbdata.end();
+ ++it) {
+ if (indexOfIterator(hbdata, it) == selfIndex) {
+ continue;
+ }
- if (it->hasAuthIssue()) {
- foundAuthError = true;
- }
+ if (it->up()) {
+ return false;
}
- return foundAuthError;
+ if (it->hasAuthIssue()) {
+ foundAuthError = true;
+ }
}
-} // namespace
+ return foundAuthError;
+}
- PingStats::PingStats() :
- count(0),
- value(std::numeric_limits<unsigned int>::max()),
- _numFailuresSinceLastStart(std::numeric_limits<int>::max()) {
- }
+} // namespace
- void PingStats::start(Date_t now) {
- _lastHeartbeatStartDate = now;
- _numFailuresSinceLastStart = 0;
+PingStats::PingStats()
+ : count(0),
+ value(std::numeric_limits<unsigned int>::max()),
+ _numFailuresSinceLastStart(std::numeric_limits<int>::max()) {}
+
+void PingStats::start(Date_t now) {
+ _lastHeartbeatStartDate = now;
+ _numFailuresSinceLastStart = 0;
+}
+
+void PingStats::hit(int millis) {
+ _numFailuresSinceLastStart = std::numeric_limits<int>::max();
+ ++count;
+ value = value == std::numeric_limits<unsigned int>::max()
+ ? millis
+ : static_cast<unsigned long>((value * .8) + (millis * .2));
+}
+
+void PingStats::miss() {
+ ++_numFailuresSinceLastStart;
+}
+
+TopologyCoordinatorImpl::TopologyCoordinatorImpl(Seconds maxSyncSourceLagSecs)
+ : _role(Role::follower),
+ _term(0),
+ _currentPrimaryIndex(-1),
+ _forceSyncSourceIndex(-1),
+ _maxSyncSourceLagSecs(maxSyncSourceLagSecs),
+ _selfIndex(-1),
+ _stepDownPending(false),
+ _maintenanceModeCalls(0),
+ _followerMode(MemberState::RS_STARTUP2) {
+ invariant(getMemberState() == MemberState::RS_STARTUP);
+}
+
+TopologyCoordinator::Role TopologyCoordinatorImpl::getRole() const {
+ return _role;
+}
+
+void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) {
+ invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
+ _forceSyncSourceIndex = index;
+}
+
+HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
+ return _syncSource;
+}
+
+HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) {
+ // If we are primary, then we aren't syncing from anyone (else).
+ if (_iAmPrimary()) {
+ return HostAndPort();
+ }
+
+ // If we are not a member of the current replica set configuration, no sync source is valid.
+ if (_selfIndex == -1) {
+ LOG(2) << "Cannot sync from any members because we are not in the replica set config";
+ return HostAndPort();
+ }
+
+ // if we have a target we've requested to sync from, use it
+ if (_forceSyncSourceIndex != -1) {
+ invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
+ _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
+ _forceSyncSourceIndex = -1;
+ std::string msg(str::stream() << "syncing from: " << _syncSource.toString()
+ << " by request");
+ log() << msg << rsLog;
+ setMyHeartbeatMessage(now, msg);
+ return _syncSource;
}
- void PingStats::hit(int millis) {
- _numFailuresSinceLastStart = std::numeric_limits<int>::max();
- ++count;
- value = value == std::numeric_limits<unsigned int>::max() ? millis :
- static_cast<unsigned long>((value * .8) + (millis * .2));
- }
+ // wait for 2N pings (not counting ourselves) before choosing a sync target
+ int needMorePings = (_hbdata.size() - 1) * 2 - _getTotalPings();
- void PingStats::miss() {
- ++_numFailuresSinceLastStart;
+ if (needMorePings > 0) {
+ OCCASIONALLY log() << "waiting for " << needMorePings
+ << " pings from other members before syncing";
+ _syncSource = HostAndPort();
+ return _syncSource;
}
- TopologyCoordinatorImpl::TopologyCoordinatorImpl(Seconds maxSyncSourceLagSecs) :
- _role(Role::follower),
- _term(0),
- _currentPrimaryIndex(-1),
- _forceSyncSourceIndex(-1),
- _maxSyncSourceLagSecs(maxSyncSourceLagSecs),
- _selfIndex(-1),
- _stepDownPending(false),
- _maintenanceModeCalls(0),
- _followerMode(MemberState::RS_STARTUP2)
- {
- invariant(getMemberState() == MemberState::RS_STARTUP);
+ // If we are only allowed to sync from the primary, set that
+ if (!_rsConfig.isChainingAllowed()) {
+ if (_currentPrimaryIndex == -1) {
+ LOG(1) << "Cannot select sync source because chaining is"
+ " not allowed and primary is unknown/down";
+ _syncSource = HostAndPort();
+ return _syncSource;
+ } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) {
+ LOG(1) << "Cannot select sync source because chaining is"
+ "not allowed and primary is not currently accepting our updates";
+ _syncSource = HostAndPort();
+ return _syncSource;
+ } else {
+ _syncSource = _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort();
+ std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString());
+ log() << msg << rsLog;
+ setMyHeartbeatMessage(now, msg);
+ return _syncSource;
+ }
}
- TopologyCoordinator::Role TopologyCoordinatorImpl::getRole() const {
- return _role;
- }
+ // find the member with the lowest ping time that is ahead of me
- void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) {
- invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
- _forceSyncSourceIndex = index;
+ // Find primary's oplog time. Reject sync candidates that are more than
+ // maxSyncSourceLagSecs seconds behind.
+ OpTime primaryOpTime;
+ if (_currentPrimaryIndex != -1) {
+ primaryOpTime = _hbdata[_currentPrimaryIndex].getOpTime();
+ } else {
+ // choose a time that will exclude no candidates, since we don't see a primary
+ primaryOpTime = OpTime(Timestamp(_maxSyncSourceLagSecs, 0), 0);
}
- HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
- return _syncSource;
+ if (primaryOpTime.getSecs() < static_cast<unsigned int>(_maxSyncSourceLagSecs.count())) {
+ // erh - I think this means there was just a new election
+ // and we don't yet know the new primary's optime
+ primaryOpTime = OpTime(Timestamp(_maxSyncSourceLagSecs, 0), 0);
}
- HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
- const OpTime& lastOpApplied) {
- // If we are primary, then we aren't syncing from anyone (else).
- if (_iAmPrimary()) {
- return HostAndPort();
- }
+ OpTime oldestSyncOpTime(Timestamp(primaryOpTime.getSecs() - _maxSyncSourceLagSecs.count(), 0),
+ primaryOpTime.getTerm());
- // If we are not a member of the current replica set configuration, no sync source is valid.
- if (_selfIndex == -1) {
- LOG(2) << "Cannot sync from any members because we are not in the replica set config";
- return HostAndPort();
- }
+ int closestIndex = -1;
- // if we have a target we've requested to sync from, use it
- if (_forceSyncSourceIndex != -1) {
- invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
- _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
- _forceSyncSourceIndex = -1;
- std::string msg(str::stream() << "syncing from: "
- << _syncSource.toString() << " by request");
- log() << msg << rsLog;
- setMyHeartbeatMessage(now, msg);
- return _syncSource;
- }
-
- // wait for 2N pings (not counting ourselves) before choosing a sync target
- int needMorePings = (_hbdata.size() - 1) * 2 - _getTotalPings();
-
- if (needMorePings > 0) {
- OCCASIONALLY log() << "waiting for " << needMorePings
- << " pings from other members before syncing";
- _syncSource = HostAndPort();
- return _syncSource;
- }
-
- // If we are only allowed to sync from the primary, set that
- if (!_rsConfig.isChainingAllowed()) {
- if (_currentPrimaryIndex == -1) {
- LOG(1) << "Cannot select sync source because chaining is"
- " not allowed and primary is unknown/down";
- _syncSource = HostAndPort();
- return _syncSource;
+ // Make two attempts. The first attempt, we ignore those nodes with
+ // slave delay higher than our own, hidden nodes, and nodes that are excessively lagged.
+ // The second attempt includes such nodes, in case those are the only ones we can reach.
+ // This loop attempts to set 'closestIndex'.
+ for (int attempts = 0; attempts < 2; ++attempts) {
+ for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
+ it != _hbdata.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_hbdata, it);
+ // Don't consider ourselves.
+ if (itIndex == _selfIndex) {
+ continue;
}
- else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) {
- LOG(1) << "Cannot select sync source because chaining is"
- "not allowed and primary is not currently accepting our updates";
- _syncSource = HostAndPort();
- return _syncSource;
+ // Candidate must be up to be considered.
+ if (!it->up()) {
+ continue;
}
- else {
- _syncSource = _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort();
- std::string msg(str::stream() << "syncing from primary: "
- << _syncSource.toString());
- log() << msg << rsLog;
- setMyHeartbeatMessage(now, msg);
- return _syncSource;
+ // Candidate must be PRIMARY or SECONDARY state to be considered.
+ if (!it->getState().readable()) {
+ continue;
}
- }
- // find the member with the lowest ping time that is ahead of me
+ const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex));
- // Find primary's oplog time. Reject sync candidates that are more than
- // maxSyncSourceLagSecs seconds behind.
- OpTime primaryOpTime;
- if (_currentPrimaryIndex != -1) {
- primaryOpTime = _hbdata[_currentPrimaryIndex].getOpTime();
- }
- else {
- // choose a time that will exclude no candidates, since we don't see a primary
- primaryOpTime = OpTime(Timestamp(_maxSyncSourceLagSecs, 0), 0);
- }
-
- if (primaryOpTime.getSecs() <
- static_cast<unsigned int>(_maxSyncSourceLagSecs.count())) {
- // erh - I think this means there was just a new election
- // and we don't yet know the new primary's optime
- primaryOpTime = OpTime(Timestamp(_maxSyncSourceLagSecs, 0), 0);
- }
-
- OpTime oldestSyncOpTime(
- Timestamp(primaryOpTime.getSecs() - _maxSyncSourceLagSecs.count(), 0),
- primaryOpTime.getTerm());
-
- int closestIndex = -1;
-
- // Make two attempts. The first attempt, we ignore those nodes with
- // slave delay higher than our own, hidden nodes, and nodes that are excessively lagged.
- // The second attempt includes such nodes, in case those are the only ones we can reach.
- // This loop attempts to set 'closestIndex'.
- for (int attempts = 0; attempts < 2; ++attempts) {
- for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
- it != _hbdata.end();
- ++it) {
- const int itIndex = indexOfIterator(_hbdata, it);
- // Don't consider ourselves.
- if (itIndex == _selfIndex) {
- continue;
- }
- // Candidate must be up to be considered.
- if (!it->up()) {
- continue;
- }
- // Candidate must be PRIMARY or SECONDARY state to be considered.
- if (!it->getState().readable()) {
- continue;
- }
-
- const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex));
-
- // Candidate must build indexes if we build indexes, to be considered.
- if (_selfConfig().shouldBuildIndexes()) {
- if (!itMemberConfig.shouldBuildIndexes()) {
- continue;
- }
- }
-
- // only consider candidates that are ahead of where we are
- if (it->getOpTime() <= lastOpApplied) {
+ // Candidate must build indexes if we build indexes, to be considered.
+ if (_selfConfig().shouldBuildIndexes()) {
+ if (!itMemberConfig.shouldBuildIndexes()) {
continue;
}
+ }
- // omit candidates that are excessively behind, on the first attempt at least.
- if (attempts == 0 &&
- it->getOpTime() < oldestSyncOpTime) {
- continue;
- }
+ // only consider candidates that are ahead of where we are
+ if (it->getOpTime() <= lastOpApplied) {
+ continue;
+ }
- // omit nodes that are more latent than anything we've already considered
- if ((closestIndex != -1) &&
- (_getPing(itMemberConfig.getHostAndPort())
- > _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) {
- continue;
- }
+ // omit candidates that are excessively behind, on the first attempt at least.
+ if (attempts == 0 && it->getOpTime() < oldestSyncOpTime) {
+ continue;
+ }
- if (attempts == 0) {
- if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay()
- || itMemberConfig.isHidden()) {
- continue; // skip this one in the first attempt
- }
- }
+ // omit nodes that are more latent than anything we've already considered
+ if ((closestIndex != -1) &&
+ (_getPing(itMemberConfig.getHostAndPort()) >
+ _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) {
+ continue;
+ }
- if (_memberIsBlacklisted(itMemberConfig, now)) {
- continue;
+ if (attempts == 0) {
+ if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay() ||
+ itMemberConfig.isHidden()) {
+ continue; // skip this one in the first attempt
}
-
- // This candidate has passed all tests; set 'closestIndex'
- closestIndex = itIndex;
}
- if (closestIndex != -1) break; // no need for second attempt
- }
- if (closestIndex == -1) {
- // Did not find any members to sync from
- std::string msg("could not find member to sync from");
- // Only log when we had a valid sync source before
- if (!_syncSource.empty()) {
- log() << msg << rsLog;
+ if (_memberIsBlacklisted(itMemberConfig, now)) {
+ continue;
}
- setMyHeartbeatMessage(now, msg);
- _syncSource = HostAndPort();
- return _syncSource;
+ // This candidate has passed all tests; set 'closestIndex'
+ closestIndex = itIndex;
}
- _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort();
- std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0);
- log() << msg << rsLog;
- setMyHeartbeatMessage(now, msg);
- return _syncSource;
+ if (closestIndex != -1)
+ break; // no need for second attempt
}
- bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberConfig,
- Date_t now) const {
- std::map<HostAndPort,Date_t>::const_iterator blacklisted =
- _syncSourceBlacklist.find(memberConfig.getHostAndPort());
- if (blacklisted != _syncSourceBlacklist.end()) {
- if (blacklisted->second > now) {
- return true;
- }
+ if (closestIndex == -1) {
+ // Did not find any members to sync from
+ std::string msg("could not find member to sync from");
+ // Only log when we had a valid sync source before
+ if (!_syncSource.empty()) {
+ log() << msg << rsLog;
}
- return false;
- }
+ setMyHeartbeatMessage(now, msg);
- void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
- LOG(2) << "blacklisting " << host << " until " << until.toString();
- _syncSourceBlacklist[host] = until;
+ _syncSource = HostAndPort();
+ return _syncSource;
}
-
- void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Date_t now) {
- std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host);
- if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) {
- LOG(2) << "unblacklisting " << host;
- _syncSourceBlacklist.erase(hostItr);
+ _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort();
+ std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0);
+ log() << msg << rsLog;
+ setMyHeartbeatMessage(now, msg);
+ return _syncSource;
+}
+
+bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberConfig,
+ Date_t now) const {
+ std::map<HostAndPort, Date_t>::const_iterator blacklisted =
+ _syncSourceBlacklist.find(memberConfig.getHostAndPort());
+ if (blacklisted != _syncSourceBlacklist.end()) {
+ if (blacklisted->second > now) {
+ return true;
}
}
+ return false;
+}
- void TopologyCoordinatorImpl::clearSyncSourceBlacklist() {
- _syncSourceBlacklist.clear();
- }
-
- void TopologyCoordinatorImpl::prepareSyncFromResponse(
- const ReplicationExecutor::CallbackArgs& data,
- const HostAndPort& target,
- const OpTime& lastOpApplied,
- BSONObjBuilder* response,
- Status* result) {
- if (data.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
- response->append("syncFromRequested", target.toString());
-
- if (_selfIndex == -1) {
- *result = Status(ErrorCodes::NotSecondary,
- "Removed and uninitialized nodes do not sync");
- return;
- }
-
- const MemberConfig& selfConfig = _selfConfig();
- if (selfConfig.isArbiter()) {
- *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync");
- return;
- }
- if (_selfIndex == _currentPrimaryIndex) {
- *result = Status(ErrorCodes::NotSecondary, "primaries don't sync");
- return;
- }
+void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
+ LOG(2) << "blacklisting " << host << " until " << until.toString();
+ _syncSourceBlacklist[host] = until;
+}
- ReplicaSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd();
- int targetIndex = 0;
- for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin();
- it != _rsConfig.membersEnd(); ++it) {
- if (it->getHostAndPort() == target) {
- targetConfig = it;
- break;
- }
- ++targetIndex;
- }
- if (targetConfig == _rsConfig.membersEnd()) {
- *result = Status(ErrorCodes::NodeNotFound,
- str::stream() << "Could not find member \"" << target.toString() <<
- "\" in replica set");
- return;
- }
- if (targetIndex == _selfIndex) {
- *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself");
- return;
- }
- if (targetConfig->isArbiter()) {
- *result = Status(ErrorCodes::InvalidOptions,
- str::stream() << "Cannot sync from \"" << target.toString() <<
- "\" because it is an arbiter");
- return;
- }
- if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) {
- *result = Status(ErrorCodes::InvalidOptions,
- str::stream() << "Cannot sync from \"" << target.toString() <<
- "\" because it does not build indexes");
- return;
- }
-
- const MemberHeartbeatData& hbdata = _hbdata[targetIndex];
- if (hbdata.hasAuthIssue()) {
- *result = Status(ErrorCodes::Unauthorized,
- str::stream() << "not authorized to communicate with " <<
- target.toString());
- return;
- }
- if (hbdata.getHealth() == 0) {
- *result = Status(ErrorCodes::HostUnreachable,
- str::stream() << "I cannot reach the requested member: " <<
- target.toString());
- return;
- }
- if (hbdata.getOpTime().getSecs()+10 < lastOpApplied.getSecs()) {
- warning() << "attempting to sync from " << target
- << ", but its latest opTime is " << hbdata.getOpTime().getSecs()
- << " and ours is " << lastOpApplied.getSecs() << " so this may not work";
- response->append("warning",
- str::stream() << "requested member \"" << target.toString() <<
- "\" is more than 10 seconds behind us");
- // not returning bad Status, just warning
- }
-
- HostAndPort prevSyncSource = getSyncSourceAddress();
- if (!prevSyncSource.empty()) {
- response->append("prevSyncTarget", prevSyncSource.toString());
- }
-
- setForceSyncSourceIndex(targetIndex);
- *result = Status::OK();
+void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Date_t now) {
+ std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host);
+ if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) {
+ LOG(2) << "unblacklisting " << host;
+ _syncSourceBlacklist.erase(hostItr);
}
+}
- void TopologyCoordinatorImpl::prepareFreshResponse(
- const ReplicationCoordinator::ReplSetFreshArgs& args,
- const Date_t now,
- const OpTime& lastOpApplied,
- BSONObjBuilder* response,
- Status* result) {
-
- if (_selfIndex == -1) {
- *result = Status(ErrorCodes::ReplicaSetNotFound,
- "Cannot participate in elections because not initialized");
- return;
- }
+void TopologyCoordinatorImpl::clearSyncSourceBlacklist() {
+ _syncSourceBlacklist.clear();
+}
- if (args.setName != _rsConfig.getReplSetName()) {
- *result = Status(ErrorCodes::ReplicaSetNotFound,
- str::stream() << "Wrong repl set name. Expected: " <<
- _rsConfig.getReplSetName() <<
- ", received: " << args.setName);
- return;
- }
+void TopologyCoordinatorImpl::prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data,
+ const HostAndPort& target,
+ const OpTime& lastOpApplied,
+ BSONObjBuilder* response,
+ Status* result) {
+ if (data.status == ErrorCodes::CallbackCanceled) {
+ *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
+ return;
+ }
- if (args.id == static_cast<unsigned>(_selfConfig().getId())) {
- *result = Status(ErrorCodes::BadValue,
- str::stream() << "Received replSetFresh command from member with the "
- "same member ID as ourself: " << args.id);
- return;
- }
+ response->append("syncFromRequested", target.toString());
- bool weAreFresher = false;
- if( _rsConfig.getConfigVersion() > args.cfgver ) {
- log() << "replSet member " << args.who << " is not yet aware its cfg version "
- << args.cfgver << " is stale";
- response->append("info", "config version stale");
- weAreFresher = true;
- }
- // check not only our own optime, but any other member we can reach
- else if (OpTime(args.opTime, _term) < _latestKnownOpTime(lastOpApplied)) {
- weAreFresher = true;
- }
- response->appendDate("opTime",
- Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL()));
- response->append("fresher", weAreFresher);
-
- std::string errmsg;
- bool doVeto = _shouldVetoMember(args, now, lastOpApplied, &errmsg);
- response->append("veto", doVeto);
- if (doVeto) {
- response->append("errmsg", errmsg);
- }
- *result = Status::OK();
+ if (_selfIndex == -1) {
+ *result = Status(ErrorCodes::NotSecondary, "Removed and uninitialized nodes do not sync");
+ return;
}
- bool TopologyCoordinatorImpl::_shouldVetoMember(
- const ReplicationCoordinator::ReplSetFreshArgs& args,
- const Date_t& now,
- const OpTime& lastOpApplied,
- std::string* errmsg) const {
+ const MemberConfig& selfConfig = _selfConfig();
+ if (selfConfig.isArbiter()) {
+ *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync");
+ return;
+ }
+ if (_selfIndex == _currentPrimaryIndex) {
+ *result = Status(ErrorCodes::NotSecondary, "primaries don't sync");
+ return;
+ }
- if (_rsConfig.getConfigVersion() < args.cfgver) {
- // We are stale; do not veto.
- return false;
+ ReplicaSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd();
+ int targetIndex = 0;
+ for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin();
+ it != _rsConfig.membersEnd();
+ ++it) {
+ if (it->getHostAndPort() == target) {
+ targetConfig = it;
+ break;
}
+ ++targetIndex;
+ }
+ if (targetConfig == _rsConfig.membersEnd()) {
+ *result = Status(ErrorCodes::NodeNotFound,
+ str::stream() << "Could not find member \"" << target.toString()
+ << "\" in replica set");
+ return;
+ }
+ if (targetIndex == _selfIndex) {
+ *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself");
+ return;
+ }
+ if (targetConfig->isArbiter()) {
+ *result = Status(ErrorCodes::InvalidOptions,
+ str::stream() << "Cannot sync from \"" << target.toString()
+ << "\" because it is an arbiter");
+ return;
+ }
+ if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) {
+ *result = Status(ErrorCodes::InvalidOptions,
+ str::stream() << "Cannot sync from \"" << target.toString()
+ << "\" because it does not build indexes");
+ return;
+ }
+
+ const MemberHeartbeatData& hbdata = _hbdata[targetIndex];
+ if (hbdata.hasAuthIssue()) {
+ *result =
+ Status(ErrorCodes::Unauthorized,
+ str::stream() << "not authorized to communicate with " << target.toString());
+ return;
+ }
+ if (hbdata.getHealth() == 0) {
+ *result =
+ Status(ErrorCodes::HostUnreachable,
+ str::stream() << "I cannot reach the requested member: " << target.toString());
+ return;
+ }
+ if (hbdata.getOpTime().getSecs() + 10 < lastOpApplied.getSecs()) {
+ warning() << "attempting to sync from " << target << ", but its latest opTime is "
+ << hbdata.getOpTime().getSecs() << " and ours is " << lastOpApplied.getSecs()
+ << " so this may not work";
+ response->append("warning",
+ str::stream() << "requested member \"" << target.toString()
+ << "\" is more than 10 seconds behind us");
+ // not returning bad Status, just warning
+ }
+
+ HostAndPort prevSyncSource = getSyncSourceAddress();
+ if (!prevSyncSource.empty()) {
+ response->append("prevSyncTarget", prevSyncSource.toString());
+ }
+
+ setForceSyncSourceIndex(targetIndex);
+ *result = Status::OK();
+}
+
+void TopologyCoordinatorImpl::prepareFreshResponse(
+ const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const Date_t now,
+ const OpTime& lastOpApplied,
+ BSONObjBuilder* response,
+ Status* result) {
+ if (_selfIndex == -1) {
+ *result = Status(ErrorCodes::ReplicaSetNotFound,
+ "Cannot participate in elections because not initialized");
+ return;
+ }
+
+ if (args.setName != _rsConfig.getReplSetName()) {
+ *result =
+ Status(ErrorCodes::ReplicaSetNotFound,
+ str::stream() << "Wrong repl set name. Expected: " << _rsConfig.getReplSetName()
+ << ", received: " << args.setName);
+ return;
+ }
+
+ if (args.id == static_cast<unsigned>(_selfConfig().getId())) {
+ *result = Status(ErrorCodes::BadValue,
+ str::stream() << "Received replSetFresh command from member with the "
+ "same member ID as ourself: " << args.id);
+ return;
+ }
+
+ bool weAreFresher = false;
+ if (_rsConfig.getConfigVersion() > args.cfgver) {
+ log() << "replSet member " << args.who << " is not yet aware its cfg version "
+ << args.cfgver << " is stale";
+ response->append("info", "config version stale");
+ weAreFresher = true;
+ }
+ // check not only our own optime, but any other member we can reach
+ else if (OpTime(args.opTime, _term) < _latestKnownOpTime(lastOpApplied)) {
+ weAreFresher = true;
+ }
+ response->appendDate("opTime",
+ Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL()));
+ response->append("fresher", weAreFresher);
+
+ std::string errmsg;
+ bool doVeto = _shouldVetoMember(args, now, lastOpApplied, &errmsg);
+ response->append("veto", doVeto);
+ if (doVeto) {
+ response->append("errmsg", errmsg);
+ }
+ *result = Status::OK();
+}
+
+bool TopologyCoordinatorImpl::_shouldVetoMember(
+ const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const Date_t& now,
+ const OpTime& lastOpApplied,
+ std::string* errmsg) const {
+ if (_rsConfig.getConfigVersion() < args.cfgver) {
+ // We are stale; do not veto.
+ return false;
+ }
- const unsigned int memberID = args.id;
- const int hopefulIndex = _getMemberIndex(memberID);
- invariant(hopefulIndex != _selfIndex);
- const int highestPriorityIndex = _getHighestPriorityElectableIndex(now, lastOpApplied);
+ const unsigned int memberID = args.id;
+ const int hopefulIndex = _getMemberIndex(memberID);
+ invariant(hopefulIndex != _selfIndex);
+ const int highestPriorityIndex = _getHighestPriorityElectableIndex(now, lastOpApplied);
- if (hopefulIndex == -1) {
- *errmsg = str::stream() << "replSet couldn't find member with id " << memberID;
- return true;
- }
+ if (hopefulIndex == -1) {
+ *errmsg = str::stream() << "replSet couldn't find member with id " << memberID;
+ return true;
+ }
- if (_iAmPrimary() && lastOpApplied >= _hbdata[hopefulIndex].getOpTime()) {
- // hbinfo is not updated for ourself, so if we are primary we have to check the
- // primary's last optime separately
- *errmsg = str::stream() << "I am already primary, " <<
- _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() <<
- " can try again once I've stepped down";
- return true;
- }
+ if (_iAmPrimary() && lastOpApplied >= _hbdata[hopefulIndex].getOpTime()) {
+ // hbinfo is not updated for ourself, so if we are primary we have to check the
+ // primary's last optime separately
+ *errmsg = str::stream() << "I am already primary, "
+ << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
+ << " can try again once I've stepped down";
+ return true;
+ }
- if (_currentPrimaryIndex != -1 &&
- (hopefulIndex != _currentPrimaryIndex) &&
- (_hbdata[_currentPrimaryIndex].getOpTime() >=
- _hbdata[hopefulIndex].getOpTime())) {
- // other members might be aware of more up-to-date nodes
- *errmsg = str::stream() <<
- _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() <<
- " is trying to elect itself but " <<
- _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString() <<
- " is already primary and more up-to-date";
- return true;
- }
+ if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) &&
+ (_hbdata[_currentPrimaryIndex].getOpTime() >= _hbdata[hopefulIndex].getOpTime())) {
+ // other members might be aware of more up-to-date nodes
+ *errmsg =
+ str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
+ << " is trying to elect itself but "
+ << _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString()
+ << " is already primary and more up-to-date";
+ return true;
+ }
- if ((highestPriorityIndex != -1)) {
- const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex);
- const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
-
- if (priorityMember.getPriority() > hopefulMember.getPriority()) {
- *errmsg = str::stream()
- << hopefulMember.getHostAndPort().toString()
- << " has lower priority of " << hopefulMember.getPriority() << " than "
- << priorityMember.getHostAndPort().toString()
- << " which has a priority of " << priorityMember.getPriority();
- return true;
- }
- }
+ if ((highestPriorityIndex != -1)) {
+ const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex);
+ const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
- UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex, lastOpApplied);
- reason &= ~RefusesToStand;
- if (reason) {
- *errmsg = str::stream()
- << "I don't think "
- << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
- << " is electable because the " << _getUnelectableReasonString(reason);
+ if (priorityMember.getPriority() > hopefulMember.getPriority()) {
+ *errmsg = str::stream() << hopefulMember.getHostAndPort().toString()
+ << " has lower priority of " << hopefulMember.getPriority()
+ << " than " << priorityMember.getHostAndPort().toString()
+ << " which has a priority of " << priorityMember.getPriority();
return true;
}
-
- return false;
}
- // produce a reply to a received electCmd
- void TopologyCoordinatorImpl::prepareElectResponse(
- const ReplicationCoordinator::ReplSetElectArgs& args,
- const Date_t now,
- const OpTime& lastOpApplied,
- BSONObjBuilder* response,
- Status* result) {
-
- if (_selfIndex == -1) {
- *result = Status(ErrorCodes::ReplicaSetNotFound,
- "Cannot participate in election because not initialized");
- return;
- }
-
- const long long myver = _rsConfig.getConfigVersion();
- const int highestPriorityIndex = _getHighestPriorityElectableIndex(now, lastOpApplied);
-
- const MemberConfig* primary = _currentPrimaryMember();
- const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid);
- const MemberConfig* highestPriority = highestPriorityIndex == -1 ? NULL :
- &_rsConfig.getMemberAt(highestPriorityIndex);
-
- int vote = 0;
- if (args.set != _rsConfig.getReplSetName()) {
- log() << "replSet error received an elect request for '" << args.set
- << "' but our set name is '" <<
- _rsConfig.getReplSetName() << "'";
- }
- else if ( myver < args.cfgver ) {
- // we are stale. don't vote
- log() << "replSetElect not voting because our config version is stale. Our version: " <<
- myver << ", their version: " << args.cfgver;
- }
- else if ( myver > args.cfgver ) {
- // they are stale!
- log() << "replSetElect command received stale config version # during election. "
- "Our version: " << myver << ", their version: " << args.cfgver;
- vote = -10000;
- }
- else if (!hopeful) {
- log() << "replSetElect couldn't find member with id " << args.whoid;
- vote = -10000;
- }
- else if (_iAmPrimary()) {
- log() << "I am already primary, " << hopeful->getHostAndPort().toString()
- << " can try again once I've stepped down";
- vote = -10000;
- }
- else if (primary) {
- log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but "
- << primary->getHostAndPort().toString() << " is already primary";
- vote = -10000;
- }
- else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) {
- // TODO(spencer): What if the lower-priority member is more up-to-date?
- log() << hopeful->getHostAndPort().toString() << " has lower priority than "
- << highestPriority->getHostAndPort().toString();
- vote = -10000;
- }
- else if (_voteLease.when + VoteLease::leaseTime >= now &&
- _voteLease.whoId != args.whoid) {
- log() << "replSet voting no for "
- << hopeful->getHostAndPort().toString()
- << "; voted for " << _voteLease.whoHostAndPort.toString() << ' '
- << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
- }
- else {
- _voteLease.when = now;
- _voteLease.whoId = args.whoid;
- _voteLease.whoHostAndPort = hopeful->getHostAndPort();
- vote = _selfConfig().getNumVotes();
- invariant(hopeful->getId() == args.whoid);
- if (vote > 0) {
- log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString()
- << " (" << args.whoid << ')';
- }
- }
-
- response->append("vote", vote);
- response->append("round", args.round);
- *result = Status::OK();
+ UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex, lastOpApplied);
+ reason &= ~RefusesToStand;
+ if (reason) {
+ *errmsg = str::stream() << "I don't think "
+ << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
+ << " is electable because the "
+ << _getUnelectableReasonString(reason);
+ return true;
}
- // produce a reply to a heartbeat
- Status TopologyCoordinatorImpl::prepareHeartbeatResponse(
- Date_t now,
- const ReplSetHeartbeatArgs& args,
- const std::string& ourSetName,
- const OpTime& lastOpApplied,
- ReplSetHeartbeatResponse* response) {
-
- if (args.getProtocolVersion() != 1) {
+ return false;
+}
+
+// produce a reply to a received electCmd
+void TopologyCoordinatorImpl::prepareElectResponse(
+ const ReplicationCoordinator::ReplSetElectArgs& args,
+ const Date_t now,
+ const OpTime& lastOpApplied,
+ BSONObjBuilder* response,
+ Status* result) {
+ if (_selfIndex == -1) {
+ *result = Status(ErrorCodes::ReplicaSetNotFound,
+ "Cannot participate in election because not initialized");
+ return;
+ }
+
+ const long long myver = _rsConfig.getConfigVersion();
+ const int highestPriorityIndex = _getHighestPriorityElectableIndex(now, lastOpApplied);
+
+ const MemberConfig* primary = _currentPrimaryMember();
+ const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid);
+ const MemberConfig* highestPriority =
+ highestPriorityIndex == -1 ? NULL : &_rsConfig.getMemberAt(highestPriorityIndex);
+
+ int vote = 0;
+ if (args.set != _rsConfig.getReplSetName()) {
+ log() << "replSet error received an elect request for '" << args.set
+ << "' but our set name is '" << _rsConfig.getReplSetName() << "'";
+ } else if (myver < args.cfgver) {
+ // we are stale. don't vote
+ log() << "replSetElect not voting because our config version is stale. Our version: "
+ << myver << ", their version: " << args.cfgver;
+ } else if (myver > args.cfgver) {
+ // they are stale!
+ log() << "replSetElect command received stale config version # during election. "
+ "Our version: " << myver << ", their version: " << args.cfgver;
+ vote = -10000;
+ } else if (!hopeful) {
+ log() << "replSetElect couldn't find member with id " << args.whoid;
+ vote = -10000;
+ } else if (_iAmPrimary()) {
+ log() << "I am already primary, " << hopeful->getHostAndPort().toString()
+ << " can try again once I've stepped down";
+ vote = -10000;
+ } else if (primary) {
+ log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but "
+ << primary->getHostAndPort().toString() << " is already primary";
+ vote = -10000;
+ } else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) {
+ // TODO(spencer): What if the lower-priority member is more up-to-date?
+ log() << hopeful->getHostAndPort().toString() << " has lower priority than "
+ << highestPriority->getHostAndPort().toString();
+ vote = -10000;
+ } else if (_voteLease.when + VoteLease::leaseTime >= now && _voteLease.whoId != args.whoid) {
+ log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for "
+ << _voteLease.whoHostAndPort.toString() << ' '
+ << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
+ } else {
+ _voteLease.when = now;
+ _voteLease.whoId = args.whoid;
+ _voteLease.whoHostAndPort = hopeful->getHostAndPort();
+ vote = _selfConfig().getNumVotes();
+ invariant(hopeful->getId() == args.whoid);
+ if (vote > 0) {
+ log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " ("
+ << args.whoid << ')';
+ }
+ }
+
+ response->append("vote", vote);
+ response->append("round", args.round);
+ *result = Status::OK();
+}
+
+// produce a reply to a heartbeat
+Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now,
+ const ReplSetHeartbeatArgs& args,
+ const std::string& ourSetName,
+ const OpTime& lastOpApplied,
+ ReplSetHeartbeatResponse* response) {
+ if (args.getProtocolVersion() != 1) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "replset: incompatible replset protocol version: "
+ << args.getProtocolVersion());
+ }
+
+ // Verify that replica set names match
+ const std::string rshb = args.getSetName();
+ if (ourSetName != rshb) {
+ log() << "replSet set names do not match, ours: " << ourSetName
+ << "; remote node's: " << rshb;
+ response->noteMismatched();
+ return Status(ErrorCodes::InconsistentReplicaSetNames,
+ str::stream() << "Our set name of " << ourSetName << " does not match name "
+ << rshb << " reported by remote node");
+ }
+
+ const MemberState myState = getMemberState();
+ if (_selfIndex == -1) {
+ if (myState.removed()) {
+ return Status(ErrorCodes::InvalidReplicaSetConfig,
+ "Our replica set configuration is invalid or does not include us");
+ }
+ } else {
+ invariant(_rsConfig.getReplSetName() == args.getSetName());
+ if (args.getSenderId() == _selfConfig().getId()) {
return Status(ErrorCodes::BadValue,
- str::stream() << "replset: incompatible replset protocol version: "
- << args.getProtocolVersion());
- }
-
- // Verify that replica set names match
- const std::string rshb = args.getSetName();
- if (ourSetName != rshb) {
- log() << "replSet set names do not match, ours: " << ourSetName <<
- "; remote node's: " << rshb;
- response->noteMismatched();
- return Status(ErrorCodes::InconsistentReplicaSetNames, str::stream() <<
- "Our set name of " << ourSetName << " does not match name " << rshb <<
- " reported by remote node");
- }
-
- const MemberState myState = getMemberState();
- if (_selfIndex == -1) {
- if (myState.removed()) {
- return Status(ErrorCodes::InvalidReplicaSetConfig,
- "Our replica set configuration is invalid or does not include us");
- }
+ str::stream() << "Received heartbeat from member with the same "
+ "member ID as ourself: " << args.getSenderId());
}
- else {
- invariant(_rsConfig.getReplSetName() == args.getSetName());
- if (args.getSenderId() == _selfConfig().getId()) {
- return Status(ErrorCodes::BadValue,
- str::stream() << "Received heartbeat from member with the same "
- "member ID as ourself: " << args.getSenderId());
- }
- }
-
- // This is a replica set
- response->noteReplSet();
-
- response->setSetName(ourSetName);
- response->setState(myState.s);
- if (myState.primary()) {
- response->setElectionTime(_electionTime);
- }
-
- // Are we electable
- response->setElectable(!_getMyUnelectableReason(now, lastOpApplied));
+ }
- // Heartbeat status message
- response->setHbMsg(_getHbmsg(now));
- response->setTime(duration_cast<Seconds>(now - Date_t{}));
- response->setOpTime(lastOpApplied);
+ // This is a replica set
+ response->noteReplSet();
- if (!_syncSource.empty()) {
- response->setSyncingTo(_syncSource);
- }
-
- if (!_rsConfig.isInitialized()) {
- response->setConfigVersion(-2);
- return Status::OK();
- }
+ response->setSetName(ourSetName);
+ response->setState(myState.s);
+ if (myState.primary()) {
+ response->setElectionTime(_electionTime);
+ }
- const long long v = _rsConfig.getConfigVersion();
- response->setConfigVersion(v);
- // Deliver new config if caller's version is older than ours
- if (v > args.getConfigVersion()) {
- response->setConfig(_rsConfig);
- }
+ // Are we electable
+ response->setElectable(!_getMyUnelectableReason(now, lastOpApplied));
- // Resolve the caller's id in our Member list
- int from = -1;
- if (v == args.getConfigVersion() && args.getSenderId() != -1) {
- from = _getMemberIndex(args.getSenderId());
- }
- if (from == -1) {
- // Can't find the member, so we leave out the stateDisagreement field
- return Status::OK();
- }
- invariant(from != _selfIndex);
+ // Heartbeat status message
+ response->setHbMsg(_getHbmsg(now));
+ response->setTime(duration_cast<Seconds>(now - Date_t{}));
+ response->setOpTime(lastOpApplied);
- // if we thought that this node is down, let it know
- if (!_hbdata[from].up()) {
- response->noteStateDisagreement();
- }
+ if (!_syncSource.empty()) {
+ response->setSyncingTo(_syncSource);
+ }
- // note that we got a heartbeat from this node
- _hbdata[from].setLastHeartbeatRecv(now);
+ if (!_rsConfig.isInitialized()) {
+ response->setConfigVersion(-2);
return Status::OK();
}
- Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(
- Date_t now,
- const ReplSetHeartbeatArgsV1& args,
- const std::string& ourSetName,
- const OpTime& lastOpApplied,
- ReplSetHeartbeatResponse* response) {
-
- // Verify that replica set names match
- const std::string rshb = args.getSetName();
- if (ourSetName != rshb) {
- log() << "replSet set names do not match, ours: " << ourSetName <<
- "; remote node's: " << rshb;
- return Status(ErrorCodes::InconsistentReplicaSetNames, str::stream() <<
- "Our set name of " << ourSetName << " does not match name " << rshb <<
- " reported by remote node");
- }
+ const long long v = _rsConfig.getConfigVersion();
+ response->setConfigVersion(v);
+ // Deliver new config if caller's version is older than ours
+ if (v > args.getConfigVersion()) {
+ response->setConfig(_rsConfig);
+ }
- const MemberState myState = getMemberState();
- if (_selfIndex == -1) {
- if (myState.removed()) {
- return Status(ErrorCodes::InvalidReplicaSetConfig,
- "Our replica set configuration is invalid or does not include us");
- }
- }
- else {
- if (args.getSenderId() == _selfConfig().getId()) {
- return Status(ErrorCodes::BadValue,
- str::stream() << "Received heartbeat from member with the same "
- "member ID as ourself: " << args.getSenderId());
- }
+ // Resolve the caller's id in our Member list
+ int from = -1;
+ if (v == args.getConfigVersion() && args.getSenderId() != -1) {
+ from = _getMemberIndex(args.getSenderId());
+ }
+ if (from == -1) {
+ // Can't find the member, so we leave out the stateDisagreement field
+ return Status::OK();
+ }
+ invariant(from != _selfIndex);
+
+ // if we thought that this node is down, let it know
+ if (!_hbdata[from].up()) {
+ response->noteStateDisagreement();
+ }
+
+ // note that we got a heartbeat from this node
+ _hbdata[from].setLastHeartbeatRecv(now);
+ return Status::OK();
+}
+
+Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now,
+ const ReplSetHeartbeatArgsV1& args,
+ const std::string& ourSetName,
+ const OpTime& lastOpApplied,
+ ReplSetHeartbeatResponse* response) {
+ // Verify that replica set names match
+ const std::string rshb = args.getSetName();
+ if (ourSetName != rshb) {
+ log() << "replSet set names do not match, ours: " << ourSetName
+ << "; remote node's: " << rshb;
+ return Status(ErrorCodes::InconsistentReplicaSetNames,
+ str::stream() << "Our set name of " << ourSetName << " does not match name "
+ << rshb << " reported by remote node");
+ }
+
+ const MemberState myState = getMemberState();
+ if (_selfIndex == -1) {
+ if (myState.removed()) {
+ return Status(ErrorCodes::InvalidReplicaSetConfig,
+ "Our replica set configuration is invalid or does not include us");
+ }
+ } else {
+ if (args.getSenderId() == _selfConfig().getId()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "Received heartbeat from member with the same "
+ "member ID as ourself: " << args.getSenderId());
}
+ }
- response->setSetName(ourSetName);
-
- response->setState(myState.s);
-
- response->setOpTime(lastOpApplied);
+ response->setSetName(ourSetName);
- if (_currentPrimaryIndex != -1) {
- response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId());
- }
+ response->setState(myState.s);
- response->setTerm(_term);
+ response->setOpTime(lastOpApplied);
- if (!_syncSource.empty()) {
- response->setSyncingTo(_syncSource);
- }
-
- if (!_rsConfig.isInitialized()) {
- response->setConfigVersion(-2);
- return Status::OK();
- }
+ if (_currentPrimaryIndex != -1) {
+ response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId());
+ }
- const long long v = _rsConfig.getConfigVersion();
- response->setConfigVersion(v);
- // Deliver new config if caller's version is older than ours
- if (v > args.getConfigVersion()) {
- response->setConfig(_rsConfig);
- }
+ response->setTerm(_term);
- // Resolve the caller's id in our Member list
- int from = -1;
- if (v == args.getConfigVersion() && args.getSenderId() != -1) {
- from = _getMemberIndex(args.getSenderId());
- }
- if (from == -1) {
- return Status::OK();
- }
- invariant(from != _selfIndex);
+ if (!_syncSource.empty()) {
+ response->setSyncingTo(_syncSource);
+ }
- // note that we got a heartbeat from this node
- _hbdata[from].setLastHeartbeatRecv(now);
+ if (!_rsConfig.isInitialized()) {
+ response->setConfigVersion(-2);
return Status::OK();
}
- int TopologyCoordinatorImpl::_getMemberIndex(int id) const {
- int index = 0;
- for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin();
- it != _rsConfig.membersEnd();
- ++it, ++index) {
- if (it->getId() == id) {
- return index;
- }
- }
- return -1;
+ const long long v = _rsConfig.getConfigVersion();
+ response->setConfigVersion(v);
+ // Deliver new config if caller's version is older than ours
+ if (v > args.getConfigVersion()) {
+ response->setConfig(_rsConfig);
}
- std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequest(
- Date_t now,
- const std::string& ourSetName,
- const HostAndPort& target) {
-
- PingStats& hbStats = _pings[target];
- Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
- if (!_rsConfig.isInitialized() ||
- (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) ||
- (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
-
- // This is either the first request ever for "target", or the heartbeat timeout has
- // passed, so we're starting a "new" heartbeat.
- hbStats.start(now);
- alreadyElapsed = Milliseconds(0);
- }
- ReplSetHeartbeatArgs hbArgs;
- hbArgs.setProtocolVersion(1);
- hbArgs.setCheckEmpty(false);
- if (_rsConfig.isInitialized()) {
- hbArgs.setSetName(_rsConfig.getReplSetName());
- hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
- if (_selfIndex >= 0) {
- const MemberConfig& me = _selfConfig();
- hbArgs.setSenderHost(me.getHostAndPort());
- hbArgs.setSenderId(me.getId());
- }
- }
- else {
- hbArgs.setSetName(ourSetName);
- hbArgs.setConfigVersion(-2);
- }
-
- const Milliseconds timeoutPeriod(
- _rsConfig.isInitialized() ?
- _rsConfig.getHeartbeatTimeoutPeriodMillis() :
- ReplicaSetConfig::kDefaultHeartbeatTimeoutPeriod);
- const Milliseconds timeout = timeoutPeriod - alreadyElapsed;
- return std::make_pair(hbArgs, timeout);
- }
-
- std::pair<ReplSetHeartbeatArgsV1, Milliseconds>
- TopologyCoordinatorImpl::prepareHeartbeatRequestV1(
- Date_t now,
- const std::string& ourSetName,
- const HostAndPort& target) {
-
- PingStats& hbStats = _pings[target];
- Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64());
- if (!_rsConfig.isInitialized() ||
- (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) ||
- (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
-
- // This is either the first request ever for "target", or the heartbeat timeout has
- // passed, so we're starting a "new" heartbeat.
- hbStats.start(now);
- alreadyElapsed = Milliseconds(0);
- }
- ReplSetHeartbeatArgsV1 hbArgs;
+ // Resolve the caller's id in our Member list
+ int from = -1;
+ if (v == args.getConfigVersion() && args.getSenderId() != -1) {
+ from = _getMemberIndex(args.getSenderId());
+ }
+ if (from == -1) {
+ return Status::OK();
+ }
+ invariant(from != _selfIndex);
+
+ // note that we got a heartbeat from this node
+ _hbdata[from].setLastHeartbeatRecv(now);
+ return Status::OK();
+}
+
+int TopologyCoordinatorImpl::_getMemberIndex(int id) const {
+ int index = 0;
+ for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin();
+ it != _rsConfig.membersEnd();
+ ++it, ++index) {
+ if (it->getId() == id) {
+ return index;
+ }
+ }
+ return -1;
+}
+
+std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequest(
+ Date_t now, const std::string& ourSetName, const HostAndPort& target) {
+ PingStats& hbStats = _pings[target];
+ Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
+ if (!_rsConfig.isInitialized() ||
+ (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) ||
+ (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
+ // This is either the first request ever for "target", or the heartbeat timeout has
+ // passed, so we're starting a "new" heartbeat.
+ hbStats.start(now);
+ alreadyElapsed = Milliseconds(0);
+ }
+ ReplSetHeartbeatArgs hbArgs;
+ hbArgs.setProtocolVersion(1);
+ hbArgs.setCheckEmpty(false);
+ if (_rsConfig.isInitialized()) {
hbArgs.setSetName(_rsConfig.getReplSetName());
hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
if (_selfIndex >= 0) {
const MemberConfig& me = _selfConfig();
- hbArgs.setSenderId(me.getId());
hbArgs.setSenderHost(me.getHostAndPort());
+ hbArgs.setSenderId(me.getId());
}
- hbArgs.setTerm(_term);
-
- const Milliseconds timeoutPeriod(
- _rsConfig.isInitialized() ?
- _rsConfig.getHeartbeatTimeoutPeriodMillis() :
- Milliseconds(
- ReplicaSetConfig::kDefaultHeartbeatTimeoutPeriod.count()));
- const Milliseconds timeout(timeoutPeriod.count() - alreadyElapsed.count());
- return std::make_pair(hbArgs, timeout);
- }
-
- HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse(
- Date_t now,
- Milliseconds networkRoundTripTime,
- const HostAndPort& target,
- const StatusWith<ReplSetHeartbeatResponse>& hbResponse,
- const OpTime& myLastOpApplied) {
-
- const MemberState originalState = getMemberState();
- PingStats& hbStats = _pings[target];
- invariant(hbStats.getLastHeartbeatStartDate() != Date_t());
- if (!hbResponse.isOK()) {
- hbStats.miss();
- }
- else {
- hbStats.hit(networkRoundTripTime.count());
- // Log diagnostics.
- if (hbResponse.getValue().isStateDisagreement()) {
- LOG(1) << target <<
- " thinks that we are down because they cannot send us heartbeats.";
- }
- }
-
- const bool isUnauthorized =
- (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) ||
- (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed);
-
- const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
- Date_t nextHeartbeatStartDate;
- // determine next start time
- if (_rsConfig.isInitialized() &&
- (hbStats.getNumFailuresSinceLastStart() <= kMaxHeartbeatRetries) &&
- (alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
-
- if (isUnauthorized) {
- nextHeartbeatStartDate = now + kHeartbeatInterval;
- } else {
- nextHeartbeatStartDate = now;
- }
- }
- else {
+ } else {
+ hbArgs.setSetName(ourSetName);
+ hbArgs.setConfigVersion(-2);
+ }
+
+ const Milliseconds timeoutPeriod(_rsConfig.isInitialized()
+ ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
+ : ReplicaSetConfig::kDefaultHeartbeatTimeoutPeriod);
+ const Milliseconds timeout = timeoutPeriod - alreadyElapsed;
+ return std::make_pair(hbArgs, timeout);
+}
+
+std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequestV1(
+ Date_t now, const std::string& ourSetName, const HostAndPort& target) {
+ PingStats& hbStats = _pings[target];
+ Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64());
+ if (!_rsConfig.isInitialized() ||
+ (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries) ||
+ (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
+ // This is either the first request ever for "target", or the heartbeat timeout has
+ // passed, so we're starting a "new" heartbeat.
+ hbStats.start(now);
+ alreadyElapsed = Milliseconds(0);
+ }
+ ReplSetHeartbeatArgsV1 hbArgs;
+ hbArgs.setSetName(_rsConfig.getReplSetName());
+ hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
+ if (_selfIndex >= 0) {
+ const MemberConfig& me = _selfConfig();
+ hbArgs.setSenderId(me.getId());
+ hbArgs.setSenderHost(me.getHostAndPort());
+ }
+ hbArgs.setTerm(_term);
+
+ const Milliseconds timeoutPeriod(
+ _rsConfig.isInitialized()
+ ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
+ : Milliseconds(ReplicaSetConfig::kDefaultHeartbeatTimeoutPeriod.count()));
+ const Milliseconds timeout(timeoutPeriod.count() - alreadyElapsed.count());
+ return std::make_pair(hbArgs, timeout);
+}
+
+HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse(
+ Date_t now,
+ Milliseconds networkRoundTripTime,
+ const HostAndPort& target,
+ const StatusWith<ReplSetHeartbeatResponse>& hbResponse,
+ const OpTime& myLastOpApplied) {
+ const MemberState originalState = getMemberState();
+ PingStats& hbStats = _pings[target];
+ invariant(hbStats.getLastHeartbeatStartDate() != Date_t());
+ if (!hbResponse.isOK()) {
+ hbStats.miss();
+ } else {
+ hbStats.hit(networkRoundTripTime.count());
+ // Log diagnostics.
+ if (hbResponse.getValue().isStateDisagreement()) {
+ LOG(1) << target << " thinks that we are down because they cannot send us heartbeats.";
+ }
+ }
+
+ const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) ||
+ (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed);
+
+ const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
+ Date_t nextHeartbeatStartDate;
+ // determine next start time
+ if (_rsConfig.isInitialized() &&
+ (hbStats.getNumFailuresSinceLastStart() <= kMaxHeartbeatRetries) &&
+ (alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
+ if (isUnauthorized) {
nextHeartbeatStartDate = now + kHeartbeatInterval;
+ } else {
+ nextHeartbeatStartDate = now;
}
+ } else {
+ nextHeartbeatStartDate = now + kHeartbeatInterval;
+ }
- if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) {
- const long long currentConfigVersion =
- _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2;
- const ReplicaSetConfig& newConfig = hbResponse.getValue().getConfig();
- if (newConfig.getConfigVersion() > currentConfigVersion) {
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction();
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
- return nextAction;
- }
- else {
- // Could be we got the newer version before we got the response, or the
- // target erroneously sent us one, even through it isn't newer.
- if (newConfig.getConfigVersion() < currentConfigVersion) {
- LOG(1) << "Config version from heartbeat was older than ours.";
- }
- else {
- LOG(2) << "Config from heartbeat response was same as ours.";
- }
- if (logger::globalLogDomain()->shouldLog(
- MongoLogDefaultComponent_component,
- ::mongo::LogstreamBuilder::severityCast(2))) {
- LogstreamBuilder lsb = log();
- if (_rsConfig.isInitialized()) {
- lsb << "Current config: " << _rsConfig.toBSON() << "; ";
- }
- lsb << "Config in heartbeat: " << newConfig.toBSON();
- }
- }
- }
-
- // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do,
- // so return early.
- if (!_rsConfig.isInitialized()) {
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
+ if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) {
+ const long long currentConfigVersion =
+ _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2;
+ const ReplicaSetConfig& newConfig = hbResponse.getValue().getConfig();
+ if (newConfig.getConfigVersion() > currentConfigVersion) {
+ HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction();
nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
return nextAction;
- }
- const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target);
- if (memberIndex == -1) {
- LOG(1) << "Could not find " << target << " in current config so ignoring --"
- " current config: " << _rsConfig.toBSON();
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
- return nextAction;
- }
- invariant(memberIndex != _selfIndex);
-
- MemberHeartbeatData& hbData = _hbdata[memberIndex];
- const MemberConfig member = _rsConfig.getMemberAt(memberIndex);
- if (!hbResponse.isOK()) {
- if (isUnauthorized) {
- LOG(1) << "setAuthIssue: heartbeat response failed due to authentication"
- " issue for member _id:" << member.getId();
- hbData.setAuthIssue(now);
- }
- else if (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries ||
- alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis()) {
-
- LOG(1) << "setDownValues: heartbeat response failed for member _id:"
- << member.getId() << ", msg: "
- << hbResponse.getStatus().reason();
-
- hbData.setDownValues(now, hbResponse.getStatus().reason());
+ } else {
+ // Could be we got the newer version before we got the response, or the
+ // target erroneously sent us one, even through it isn't newer.
+ if (newConfig.getConfigVersion() < currentConfigVersion) {
+ LOG(1) << "Config version from heartbeat was older than ours.";
+ } else {
+ LOG(2) << "Config from heartbeat response was same as ours.";
}
- else {
- LOG(3) << "Bad heartbeat response from " << target <<
- "; trying again; Retries left: " <<
- (kMaxHeartbeatRetries - hbStats.getNumFailuresSinceLastStart()) <<
- "; " << alreadyElapsed.count() << "ms have already elapsed";
+ if (logger::globalLogDomain()->shouldLog(MongoLogDefaultComponent_component,
+ ::mongo::LogstreamBuilder::severityCast(2))) {
+ LogstreamBuilder lsb = log();
+ if (_rsConfig.isInitialized()) {
+ lsb << "Current config: " << _rsConfig.toBSON() << "; ";
+ }
+ lsb << "Config in heartbeat: " << newConfig.toBSON();
}
}
- else {
- ReplSetHeartbeatResponse hbr = hbResponse.getValue();
- LOG(3) << "setUpValues: heartbeat response good for member _id:"
- << member.getId() << ", msg: "
- << hbr.getHbMsg();
- hbData.setUpValues(now, member.getHostAndPort(), hbr);
- }
- HeartbeatResponseAction nextAction = _updateHeartbeatDataImpl(
- memberIndex,
- originalState,
- now,
- myLastOpApplied);
+ }
+ // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do,
+ // so return early.
+ if (!_rsConfig.isInitialized()) {
+ HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
return nextAction;
}
+ const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target);
+ if (memberIndex == -1) {
+ LOG(1) << "Could not find " << target << " in current config so ignoring --"
+ " current config: " << _rsConfig.toBSON();
+ HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
+ nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
+ return nextAction;
+ }
+ invariant(memberIndex != _selfIndex);
+
+ MemberHeartbeatData& hbData = _hbdata[memberIndex];
+ const MemberConfig member = _rsConfig.getMemberAt(memberIndex);
+ if (!hbResponse.isOK()) {
+ if (isUnauthorized) {
+ LOG(1) << "setAuthIssue: heartbeat response failed due to authentication"
+ " issue for member _id:" << member.getId();
+ hbData.setAuthIssue(now);
+ } else if (hbStats.getNumFailuresSinceLastStart() > kMaxHeartbeatRetries ||
+ alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis()) {
+ LOG(1) << "setDownValues: heartbeat response failed for member _id:" << member.getId()
+ << ", msg: " << hbResponse.getStatus().reason();
+
+ hbData.setDownValues(now, hbResponse.getStatus().reason());
+ } else {
+ LOG(3) << "Bad heartbeat response from " << target << "; trying again; Retries left: "
+ << (kMaxHeartbeatRetries - hbStats.getNumFailuresSinceLastStart()) << "; "
+ << alreadyElapsed.count() << "ms have already elapsed";
+ }
+ } else {
+ ReplSetHeartbeatResponse hbr = hbResponse.getValue();
+ LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId()
+ << ", msg: " << hbr.getHbMsg();
+ hbData.setUpValues(now, member.getHostAndPort(), hbr);
+ }
+ HeartbeatResponseAction nextAction =
+ _updateHeartbeatDataImpl(memberIndex, originalState, now, myLastOpApplied);
+
+ nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
+ return nextAction;
+}
+
+HeartbeatResponseAction TopologyCoordinatorImpl::_updateHeartbeatDataImpl(
+ int updatedConfigIndex,
+ const MemberState& originalState,
+ Date_t now,
+ const OpTime& lastOpApplied) {
+ // This method has two interrelated responsibilities, performed in two phases.
+ //
+ // First, it updates the local notion of which remote node, if any is primary. In the
+ // process, it may request a remote primary to step down because there is a higher priority
+ // node waiting, or because the local node thinks it is primary and that it has a more
+ // recent electionTime. It may instead decide that the local node should step down itself,
+ // because a remote has a more recent election time.
+ //
+ // Second, if there is no remote primary, and the local node is not primary, it considers
+ // whether or not to stand for election.
+ invariant(updatedConfigIndex != _selfIndex);
+
+ // We are missing from the config, so do not participate in primary maintenance or election.
+ if (_selfIndex == -1) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+
+ ////////////////////
+ // Phase 1
+ ////////////////////
+
+ // If we believe the node whose data was just updated is primary, confirm that
+ // the updated data supports that notion. If not, erase our notion of who is primary.
+ if (updatedConfigIndex == _currentPrimaryIndex) {
+ const MemberHeartbeatData& updatedHBData = _hbdata[updatedConfigIndex];
+ if (!updatedHBData.up() || !updatedHBData.getState().primary()) {
+ _currentPrimaryIndex = -1;
+ }
+ }
+
+ // If the current primary is not highest priority and up to date (within 10s),
+ // have them/me stepdown.
+ if (_currentPrimaryIndex != -1) {
+ // check if we should ask the primary (possibly ourselves) to step down
+ const int highestPriorityIndex = _getHighestPriorityElectableIndex(now, lastOpApplied);
+ if (highestPriorityIndex != -1) {
+ const MemberConfig& currentPrimaryMember = _rsConfig.getMemberAt(_currentPrimaryIndex);
+ const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
+ const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex
+ ? lastOpApplied
+ : _hbdata[highestPriorityIndex].getOpTime();
- HeartbeatResponseAction TopologyCoordinatorImpl::_updateHeartbeatDataImpl(
- int updatedConfigIndex,
- const MemberState& originalState,
- Date_t now,
- const OpTime& lastOpApplied) {
-
- // This method has two interrelated responsibilities, performed in two phases.
- //
- // First, it updates the local notion of which remote node, if any is primary. In the
- // process, it may request a remote primary to step down because there is a higher priority
- // node waiting, or because the local node thinks it is primary and that it has a more
- // recent electionTime. It may instead decide that the local node should step down itself,
- // because a remote has a more recent election time.
- //
- // Second, if there is no remote primary, and the local node is not primary, it considers
- // whether or not to stand for election.
- invariant(updatedConfigIndex != _selfIndex);
-
- // We are missing from the config, so do not participate in primary maintenance or election.
- if (_selfIndex == -1) {
- return HeartbeatResponseAction::makeNoAction();
- }
-
- ////////////////////
- // Phase 1
- ////////////////////
-
- // If we believe the node whose data was just updated is primary, confirm that
- // the updated data supports that notion. If not, erase our notion of who is primary.
- if (updatedConfigIndex == _currentPrimaryIndex) {
- const MemberHeartbeatData& updatedHBData = _hbdata[updatedConfigIndex];
- if (!updatedHBData.up() || !updatedHBData.getState().primary()) {
- _currentPrimaryIndex = -1;
- }
- }
+ if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) &&
+ _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime, lastOpApplied)) {
+ const OpTime latestOpTime = _latestKnownOpTime(lastOpApplied);
- // If the current primary is not highest priority and up to date (within 10s),
- // have them/me stepdown.
- if (_currentPrimaryIndex != -1) {
- // check if we should ask the primary (possibly ourselves) to step down
- const int highestPriorityIndex = _getHighestPriorityElectableIndex(now, lastOpApplied);
- if (highestPriorityIndex != -1) {
- const MemberConfig& currentPrimaryMember =
- _rsConfig.getMemberAt(_currentPrimaryIndex);
- const MemberConfig& highestPriorityMember =
- _rsConfig.getMemberAt(highestPriorityIndex);
- const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex ?
- lastOpApplied : _hbdata[highestPriorityIndex].getOpTime();
-
- if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) &&
- _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime,
- lastOpApplied)) {
- const OpTime latestOpTime = _latestKnownOpTime(lastOpApplied);
-
- if (_iAmPrimary()) {
- if (_stepDownPending) {
- return HeartbeatResponseAction::makeNoAction();
- }
- _stepDownPending = true;
- log() << "Stepping down self (priority "
- << currentPrimaryMember.getPriority() << ") because "
- << highestPriorityMember.getHostAndPort() << " has higher priority "
- << highestPriorityMember.getPriority() << " and is only "
- << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
- << " seconds behind me";
- const Date_t until = now + VoteLease::leaseTime + kHeartbeatInterval;
- if (_electionSleepUntil < until) {
- _electionSleepUntil = until;
- }
- return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
+ if (_iAmPrimary()) {
+ if (_stepDownPending) {
+ return HeartbeatResponseAction::makeNoAction();
}
- else if ((highestPriorityIndex == _selfIndex) &&
- (_electionSleepUntil <= now)) {
- // If this node is the highest priority node, and it is not in
- // an inter-election sleep period, ask the current primary to step down.
- // This is an optimization, because the remote primary will almost certainly
- // notice this node's electability promptly, via its own heartbeat process.
- log() << "Requesting that " << currentPrimaryMember.getHostAndPort()
- << " (priority " << currentPrimaryMember.getPriority()
- << ") step down because I have higher priority "
- << highestPriorityMember.getPriority() << " and am only "
- << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
- << " seconds behind it";
- int primaryIndex = _currentPrimaryIndex;
- _currentPrimaryIndex = -1;
- return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex);
+ _stepDownPending = true;
+ log() << "Stepping down self (priority " << currentPrimaryMember.getPriority()
+ << ") because " << highestPriorityMember.getHostAndPort()
+ << " has higher priority " << highestPriorityMember.getPriority()
+ << " and is only "
+ << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
+ << " seconds behind me";
+ const Date_t until = now + VoteLease::leaseTime + kHeartbeatInterval;
+ if (_electionSleepUntil < until) {
+ _electionSleepUntil = until;
}
+ return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
+ } else if ((highestPriorityIndex == _selfIndex) && (_electionSleepUntil <= now)) {
+ // If this node is the highest priority node, and it is not in
+ // an inter-election sleep period, ask the current primary to step down.
+ // This is an optimization, because the remote primary will almost certainly
+ // notice this node's electability promptly, via its own heartbeat process.
+ log() << "Requesting that " << currentPrimaryMember.getHostAndPort()
+ << " (priority " << currentPrimaryMember.getPriority()
+ << ") step down because I have higher priority "
+ << highestPriorityMember.getPriority() << " and am only "
+ << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
+ << " seconds behind it";
+ int primaryIndex = _currentPrimaryIndex;
+ _currentPrimaryIndex = -1;
+ return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex);
}
}
}
+ }
- // Scan the member list's heartbeat data for who is primary, and update
- // _currentPrimaryIndex and _role, or request a remote to step down, as necessary.
- {
- int remotePrimaryIndex = -1;
- for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
- it != _hbdata.end();
- ++it) {
- const int itIndex = indexOfIterator(_hbdata, it);
- if (itIndex == _selfIndex) {
- continue;
- }
-
- if( it->getState().primary() && it->up() ) {
- if (remotePrimaryIndex != -1) {
- // two other nodes think they are primary (asynchronously polled)
- // -- wait for things to settle down.
- warning() << "two remote primaries (transiently)";
- return HeartbeatResponseAction::makeNoAction();
- }
- remotePrimaryIndex = itIndex;
- }
+ // Scan the member list's heartbeat data for who is primary, and update
+ // _currentPrimaryIndex and _role, or request a remote to step down, as necessary.
+ {
+ int remotePrimaryIndex = -1;
+ for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
+ it != _hbdata.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_hbdata, it);
+ if (itIndex == _selfIndex) {
+ continue;
}
- if (remotePrimaryIndex != -1) {
- // If it's the same as last time, don't do anything further.
- if (_currentPrimaryIndex == remotePrimaryIndex) {
+ if (it->getState().primary() && it->up()) {
+ if (remotePrimaryIndex != -1) {
+ // two other nodes think they are primary (asynchronously polled)
+ // -- wait for things to settle down.
+ warning() << "two remote primaries (transiently)";
return HeartbeatResponseAction::makeNoAction();
}
- // Clear last heartbeat message on ourselves (why?)
- setMyHeartbeatMessage(now, "");
-
- // If we are also primary, this is a problem. Determine who should step down.
- if (_iAmPrimary()) {
- Timestamp remoteElectionTime = _hbdata[remotePrimaryIndex].getElectionTime();
- log() << "another primary seen with election time "
- << remoteElectionTime << " my election time is " << _electionTime;
-
- // Step down whomever has the older election time.
- if (remoteElectionTime > _electionTime) {
- if (_stepDownPending) {
- return HeartbeatResponseAction::makeNoAction();
- }
- _stepDownPending = true;
- log() << "stepping down; another primary was elected more recently";
- return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
- }
- else {
- log() << "another PRIMARY detected and it should step down"
- " since it was elected earlier than me";
- return HeartbeatResponseAction::makeStepDownRemoteAction(
- remotePrimaryIndex);
- }
- }
-
- _currentPrimaryIndex = remotePrimaryIndex;
- return HeartbeatResponseAction::makeNoAction();
+ remotePrimaryIndex = itIndex;
}
}
- ////////////////////
- // Phase 2
- ////////////////////
+ if (remotePrimaryIndex != -1) {
+ // If it's the same as last time, don't do anything further.
+ if (_currentPrimaryIndex == remotePrimaryIndex) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ // Clear last heartbeat message on ourselves (why?)
+ setMyHeartbeatMessage(now, "");
- // We do not believe any remote to be primary.
+ // If we are also primary, this is a problem. Determine who should step down.
+ if (_iAmPrimary()) {
+ Timestamp remoteElectionTime = _hbdata[remotePrimaryIndex].getElectionTime();
+ log() << "another primary seen with election time " << remoteElectionTime
+ << " my election time is " << _electionTime;
- // If we are primary, check if we can still see majority of the set;
- // stepdown if we can't.
- if (_iAmPrimary()) {
- if (CannotSeeMajority & _getMyUnelectableReason(now, lastOpApplied)) {
- if (_stepDownPending) {
- return HeartbeatResponseAction::makeNoAction();
+ // Step down whomever has the older election time.
+ if (remoteElectionTime > _electionTime) {
+ if (_stepDownPending) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ _stepDownPending = true;
+ log() << "stepping down; another primary was elected more recently";
+ return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
+ } else {
+ log() << "another PRIMARY detected and it should step down"
+ " since it was elected earlier than me";
+ return HeartbeatResponseAction::makeStepDownRemoteAction(remotePrimaryIndex);
}
- _stepDownPending = true;
- log() << "can't see a majority of the set, relinquishing primary";
- return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
}
- LOG(2) << "Choosing to remain primary";
+ _currentPrimaryIndex = remotePrimaryIndex;
return HeartbeatResponseAction::makeNoAction();
}
+ }
- fassert(18505, _currentPrimaryIndex == -1);
-
- const MemberState currentState = getMemberState();
- if (originalState.recovering() && currentState.secondary()) {
- // We just transitioned from RECOVERING to SECONDARY, this can only happen if we
- // received a heartbeat with an auth error when previously all the heartbeats we'd
- // received had auth errors. In this case, don't return makeElectAction() because
- // that could cause the election to start before the ReplicationCoordinator has updated
- // its notion of the member state to SECONDARY. Instead return noAction so that the
- // ReplicationCooridinator knows to update its tracking of the member state off of the
- // TopologyCoordinator, and leave starting the election until the next heartbeat comes
- // back.
- return HeartbeatResponseAction::makeNoAction();
- }
+ ////////////////////
+ // Phase 2
+ ////////////////////
- // At this point, there is no primary anywhere. Check to see if we should become a
- // candidate.
- if (!checkShouldStandForElection(now, lastOpApplied)) {
- return HeartbeatResponseAction::makeNoAction();
+ // We do not believe any remote to be primary.
+
+ // If we are primary, check if we can still see majority of the set;
+ // stepdown if we can't.
+ if (_iAmPrimary()) {
+ if (CannotSeeMajority & _getMyUnelectableReason(now, lastOpApplied)) {
+ if (_stepDownPending) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+ _stepDownPending = true;
+ log() << "can't see a majority of the set, relinquishing primary";
+ return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
}
- return HeartbeatResponseAction::makeElectAction();
+
+ LOG(2) << "Choosing to remain primary";
+ return HeartbeatResponseAction::makeNoAction();
}
- bool TopologyCoordinatorImpl::checkShouldStandForElection(
- Date_t now, const OpTime& lastOpApplied) {
- if (_currentPrimaryIndex != -1) {
- return false;
- }
- invariant (_role != Role::leader);
+ fassert(18505, _currentPrimaryIndex == -1);
- if (_role == Role::candidate) {
- LOG(2) << "Not standing for election again; already candidate";
- return false;
- }
+ const MemberState currentState = getMemberState();
+ if (originalState.recovering() && currentState.secondary()) {
+ // We just transitioned from RECOVERING to SECONDARY, this can only happen if we
+ // received a heartbeat with an auth error when previously all the heartbeats we'd
+ // received had auth errors. In this case, don't return makeElectAction() because
+ // that could cause the election to start before the ReplicationCoordinator has updated
+ // its notion of the member state to SECONDARY. Instead return noAction so that the
+ // ReplicationCooridinator knows to update its tracking of the member state off of the
+ // TopologyCoordinator, and leave starting the election until the next heartbeat comes
+ // back.
+ return HeartbeatResponseAction::makeNoAction();
+ }
- const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, lastOpApplied);
- if (NotCloseEnoughToLatestOptime & unelectableReason) {
- LOG(2) << "Not standing for election because " <<
- _getUnelectableReasonString(unelectableReason) << "; my last optime is " <<
- lastOpApplied << " and the newest is " << _latestKnownOpTime(lastOpApplied);
- return false;
- }
- if (unelectableReason) {
- LOG(2) << "Not standing for election because " <<
- _getUnelectableReasonString(unelectableReason);
- return false;
- }
- if (_electionSleepUntil > now) {
- LOG(2) << "Not standing for election before " <<
- dateToISOStringLocal(_electionSleepUntil) << " because I stood too recently";
- return false;
- }
- // All checks passed, become a candidate and start election proceedings.
- _role = Role::candidate;
- return true;
+ // At this point, there is no primary anywhere. Check to see if we should become a
+ // candidate.
+ if (!checkShouldStandForElection(now, lastOpApplied)) {
+ return HeartbeatResponseAction::makeNoAction();
}
+ return HeartbeatResponseAction::makeElectAction();
+}
- bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const {
- int vUp = 0;
- for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
- it != _hbdata.end();
- ++it) {
- const int itIndex = indexOfIterator(_hbdata, it);
- if (itIndex == _selfIndex || it->up()) {
- vUp += _rsConfig.getMemberAt(itIndex).getNumVotes();
- }
- }
+bool TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now, const OpTime& lastOpApplied) {
+ if (_currentPrimaryIndex != -1) {
+ return false;
+ }
+ invariant(_role != Role::leader);
- return vUp * 2 > _rsConfig.getTotalVotingMembers();
+ if (_role == Role::candidate) {
+ LOG(2) << "Not standing for election again; already candidate";
+ return false;
}
- bool TopologyCoordinatorImpl::_isOpTimeCloseEnoughToLatestToElect(
- const OpTime& otherOpTime, const OpTime& ourLastOpApplied) const {
- const OpTime latestKnownOpTime = _latestKnownOpTime(ourLastOpApplied);
- // Use addition instead of subtraction to avoid overflow.
- return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs());
+ const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, lastOpApplied);
+ if (NotCloseEnoughToLatestOptime & unelectableReason) {
+ LOG(2) << "Not standing for election because "
+ << _getUnelectableReasonString(unelectableReason) << "; my last optime is "
+ << lastOpApplied << " and the newest is " << _latestKnownOpTime(lastOpApplied);
+ return false;
+ }
+ if (unelectableReason) {
+ LOG(2) << "Not standing for election because "
+ << _getUnelectableReasonString(unelectableReason);
+ return false;
}
+ if (_electionSleepUntil > now) {
+ LOG(2) << "Not standing for election before " << dateToISOStringLocal(_electionSleepUntil)
+ << " because I stood too recently";
+ return false;
+ }
+ // All checks passed, become a candidate and start election proceedings.
+ _role = Role::candidate;
+ return true;
+}
- bool TopologyCoordinatorImpl::_iAmPrimary() const {
- if (_role == Role::leader) {
- invariant(_currentPrimaryIndex == _selfIndex);
- return true;
+bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const {
+ int vUp = 0;
+ for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin(); it != _hbdata.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_hbdata, it);
+ if (itIndex == _selfIndex || it->up()) {
+ vUp += _rsConfig.getMemberAt(itIndex).getNumVotes();
}
- return false;
}
- OpTime TopologyCoordinatorImpl::_latestKnownOpTime(const OpTime& ourLastOpApplied) const {
- OpTime latest = ourLastOpApplied;
+ return vUp * 2 > _rsConfig.getTotalVotingMembers();
+}
- for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
- it != _hbdata.end();
- ++it) {
+bool TopologyCoordinatorImpl::_isOpTimeCloseEnoughToLatestToElect(
+ const OpTime& otherOpTime, const OpTime& ourLastOpApplied) const {
+ const OpTime latestKnownOpTime = _latestKnownOpTime(ourLastOpApplied);
+ // Use addition instead of subtraction to avoid overflow.
+ return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs());
+}
- if (indexOfIterator(_hbdata, it) == _selfIndex) {
- continue;
- }
- if (!it->up()) {
- continue;
- }
+bool TopologyCoordinatorImpl::_iAmPrimary() const {
+ if (_role == Role::leader) {
+ invariant(_currentPrimaryIndex == _selfIndex);
+ return true;
+ }
+ return false;
+}
- OpTime optime = it->getOpTime();
+OpTime TopologyCoordinatorImpl::_latestKnownOpTime(const OpTime& ourLastOpApplied) const {
+ OpTime latest = ourLastOpApplied;
- if (optime > latest) {
- latest = optime;
- }
+ for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin(); it != _hbdata.end();
+ ++it) {
+ if (indexOfIterator(_hbdata, it) == _selfIndex) {
+ continue;
}
+ if (!it->up()) {
+ continue;
+ }
+
+ OpTime optime = it->getOpTime();
- return latest;
+ if (optime > latest) {
+ latest = optime;
+ }
}
- bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex,
- int memberTwoIndex) const {
- if (memberOneIndex == -1)
- return false;
+ return latest;
+}
- if (memberTwoIndex == -1)
- return true;
+bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex,
+ int memberTwoIndex) const {
+ if (memberOneIndex == -1)
+ return false;
- return _rsConfig.getMemberAt(memberOneIndex).getPriority() >
- _rsConfig.getMemberAt(memberTwoIndex).getPriority();
- }
+ if (memberTwoIndex == -1)
+ return true;
- int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(
- Date_t now, const OpTime& lastOpApplied) const {
- int maxIndex = -1;
- for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) {
- UnelectableReasonMask reason = currentIndex == _selfIndex ?
- _getMyUnelectableReason(now, lastOpApplied) :
- _getUnelectableReason(currentIndex, lastOpApplied);
- if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) {
- maxIndex = currentIndex;
- }
- }
+ return _rsConfig.getMemberAt(memberOneIndex).getPriority() >
+ _rsConfig.getMemberAt(memberTwoIndex).getPriority();
+}
- return maxIndex;
+int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now,
+ const OpTime& lastOpApplied) const {
+ int maxIndex = -1;
+ for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) {
+ UnelectableReasonMask reason = currentIndex == _selfIndex
+ ? _getMyUnelectableReason(now, lastOpApplied)
+ : _getUnelectableReason(currentIndex, lastOpApplied);
+ if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) {
+ maxIndex = currentIndex;
+ }
}
- void TopologyCoordinatorImpl::prepareForStepDown() {
- _stepDownPending = true;
- }
+ return maxIndex;
+}
- void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMemberState,
- const Timestamp& electionTime) {
- invariant(_selfIndex != -1);
- if (newMemberState == getMemberState())
- return;
- switch(newMemberState.s) {
+void TopologyCoordinatorImpl::prepareForStepDown() {
+ _stepDownPending = true;
+}
+
+void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMemberState,
+ const Timestamp& electionTime) {
+ invariant(_selfIndex != -1);
+ if (newMemberState == getMemberState())
+ return;
+ switch (newMemberState.s) {
case MemberState::RS_PRIMARY:
_role = Role::candidate;
processWinElection(OID(), electionTime);
@@ -1374,728 +1298,692 @@ namespace {
}
break;
case MemberState::RS_STARTUP:
- updateConfig(
- ReplicaSetConfig(),
- -1,
- Date_t(),
- OpTime());
+ updateConfig(ReplicaSetConfig(), -1, Date_t(), OpTime());
break;
default:
severe() << "Cannot switch to state " << newMemberState;
invariant(false);
- }
- if (getMemberState() != newMemberState.s) {
- severe() << "Expected to enter state " << newMemberState << " but am now in " <<
- getMemberState();
- invariant(false);
- }
- log() << newMemberState;
}
-
- void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) {
- if (primaryIndex == _selfIndex) {
- changeMemberState_forTest(MemberState::RS_PRIMARY);
- }
- else {
- if (_iAmPrimary()) {
- changeMemberState_forTest(MemberState::RS_SECONDARY);
- }
- if (primaryIndex != -1) {
- ReplSetHeartbeatResponse hbResponse;
- hbResponse.setState(MemberState::RS_PRIMARY);
- hbResponse.setElectionTime(Timestamp());
- hbResponse.setOpTime(_hbdata[primaryIndex].getOpTime());
- hbResponse.setSyncingTo(HostAndPort());
- hbResponse.setHbMsg("");
- _hbdata[primaryIndex].setUpValues(
- _hbdata[primaryIndex].getLastHeartbeat(),
- _rsConfig.getMemberAt(primaryIndex).getHostAndPort(),
- hbResponse);
- }
- _currentPrimaryIndex = primaryIndex;
- }
+ if (getMemberState() != newMemberState.s) {
+ severe() << "Expected to enter state " << newMemberState << " but am now in "
+ << getMemberState();
+ invariant(false);
}
+ log() << newMemberState;
+}
- const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const {
- if (_currentPrimaryIndex == -1)
- return NULL;
-
- return &(_rsConfig.getMemberAt(_currentPrimaryIndex));
- }
-
- void TopologyCoordinatorImpl::prepareStatusResponse(
- const ReplicationExecutor::CallbackArgs& data,
- Date_t now,
- unsigned selfUptime,
- const OpTime& lastOpApplied,
- BSONObjBuilder* response,
- Status* result) {
- if (data.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
- // output for each member
- vector<BSONObj> membersOut;
- const MemberState myState = getMemberState();
+void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) {
+ if (primaryIndex == _selfIndex) {
+ changeMemberState_forTest(MemberState::RS_PRIMARY);
+ } else {
+ if (_iAmPrimary()) {
+ changeMemberState_forTest(MemberState::RS_SECONDARY);
+ }
+ if (primaryIndex != -1) {
+ ReplSetHeartbeatResponse hbResponse;
+ hbResponse.setState(MemberState::RS_PRIMARY);
+ hbResponse.setElectionTime(Timestamp());
+ hbResponse.setOpTime(_hbdata[primaryIndex].getOpTime());
+ hbResponse.setSyncingTo(HostAndPort());
+ hbResponse.setHbMsg("");
+ _hbdata[primaryIndex].setUpValues(_hbdata[primaryIndex].getLastHeartbeat(),
+ _rsConfig.getMemberAt(primaryIndex).getHostAndPort(),
+ hbResponse);
+ }
+ _currentPrimaryIndex = primaryIndex;
+ }
+}
+
+const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const {
+ if (_currentPrimaryIndex == -1)
+ return NULL;
+
+ return &(_rsConfig.getMemberAt(_currentPrimaryIndex));
+}
+
+void TopologyCoordinatorImpl::prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data,
+ Date_t now,
+ unsigned selfUptime,
+ const OpTime& lastOpApplied,
+ BSONObjBuilder* response,
+ Status* result) {
+ if (data.status == ErrorCodes::CallbackCanceled) {
+ *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
+ return;
+ }
+
+ // output for each member
+ vector<BSONObj> membersOut;
+ const MemberState myState = getMemberState();
+
+ if (_selfIndex == -1) {
+ // We're REMOVED or have an invalid config
+ response->append("state", static_cast<int>(myState.s));
+ response->append("stateStr", myState.toString());
+ response->append("uptime", selfUptime);
+
+ BSONObjBuilder opTime(response->subobjStart("optime"));
+ opTime.append("ts", lastOpApplied.getTimestamp());
+ opTime.append("term", lastOpApplied.getTerm());
+ opTime.done();
+
+ response->appendDate("optimeDate",
+ Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
+ if (_maintenanceModeCalls) {
+ response->append("maintenanceMode", _maintenanceModeCalls);
+ }
+ std::string s = _getHbmsg(now);
+ if (!s.empty())
+ response->append("infoMessage", s);
+ *result = Status(ErrorCodes::InvalidReplicaSetConfig,
+ "Our replica set config is invalid or we are not a member of it");
+ return;
+ }
+
+ for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin(); it != _hbdata.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_hbdata, it);
+ if (itIndex == _selfIndex) {
+ // add self
+ BSONObjBuilder bb;
+ bb.append("_id", _selfConfig().getId());
+ bb.append("name", _selfConfig().getHostAndPort().toString());
+ bb.append("health", 1.0);
+ bb.append("state", static_cast<int>(myState.s));
+ bb.append("stateStr", myState.toString());
+ bb.append("uptime", selfUptime);
+ if (!_selfConfig().isArbiter()) {
+ BSONObjBuilder opTime(bb.subobjStart("optime"));
+ opTime.append("ts", lastOpApplied.getTimestamp());
+ opTime.append("term", lastOpApplied.getTerm());
+ opTime.done();
+
+ bb.appendDate("optimeDate",
+ Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
+ }
+
+ if (!_syncSource.empty() && !_iAmPrimary()) {
+ bb.append("syncingTo", _syncSource.toString());
+ }
- if (_selfIndex == -1) {
- // We're REMOVED or have an invalid config
- response->append("state", static_cast<int>(myState.s));
- response->append("stateStr", myState.toString());
- response->append("uptime", selfUptime);
-
- BSONObjBuilder opTime(response->subobjStart("optime"));
- opTime.append("ts", lastOpApplied.getTimestamp());
- opTime.append("term", lastOpApplied.getTerm());
- opTime.done();
-
- response->appendDate("optimeDate",
- Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
if (_maintenanceModeCalls) {
- response->append("maintenanceMode", _maintenanceModeCalls);
+ bb.append("maintenanceMode", _maintenanceModeCalls);
}
- std::string s = _getHbmsg(now);
- if( !s.empty() )
- response->append("infoMessage", s);
- *result = Status(ErrorCodes::InvalidReplicaSetConfig,
- "Our replica set config is invalid or we are not a member of it");
- return;
- }
-
- for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
- it != _hbdata.end();
- ++it) {
- const int itIndex = indexOfIterator(_hbdata, it);
- if (itIndex == _selfIndex) {
- // add self
- BSONObjBuilder bb;
- bb.append("_id", _selfConfig().getId());
- bb.append("name", _selfConfig().getHostAndPort().toString());
- bb.append("health", 1.0);
- bb.append("state", static_cast<int>(myState.s));
- bb.append("stateStr", myState.toString());
- bb.append("uptime", selfUptime);
- if (!_selfConfig().isArbiter()) {
- BSONObjBuilder opTime(bb.subobjStart("optime"));
- opTime.append("ts", lastOpApplied.getTimestamp());
- opTime.append("term", lastOpApplied.getTerm());
- opTime.done();
-
- bb.appendDate("optimeDate",
- Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
- }
-
- if (!_syncSource.empty() && !_iAmPrimary()) {
- bb.append("syncingTo", _syncSource.toString());
- }
-
- if (_maintenanceModeCalls) {
- bb.append("maintenanceMode", _maintenanceModeCalls);
- }
-
- std::string s = _getHbmsg(now);
- if( !s.empty() )
- bb.append("infoMessage", s);
- if (myState.primary()) {
- bb.append("electionTime", _electionTime);
- bb.appendDate("electionDate",
- Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs())));
- }
- bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion());
- bb.append("self", true);
- membersOut.push_back(bb.obj());
+ std::string s = _getHbmsg(now);
+ if (!s.empty())
+ bb.append("infoMessage", s);
+
+ if (myState.primary()) {
+ bb.append("electionTime", _electionTime);
+ bb.appendDate("electionDate",
+ Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs())));
+ }
+ bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion());
+ bb.append("self", true);
+ membersOut.push_back(bb.obj());
+ } else {
+ // add non-self member
+ const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex);
+ BSONObjBuilder bb;
+ bb.append("_id", itConfig.getId());
+ bb.append("name", itConfig.getHostAndPort().toString());
+ double h = it->getHealth();
+ bb.append("health", h);
+ const MemberState state = it->getState();
+ bb.append("state", static_cast<int>(state.s));
+ if (h == 0) {
+ // if we can't connect the state info is from the past
+ // and could be confusing to show
+ bb.append("stateStr", "(not reachable/healthy)");
+ } else {
+ bb.append("stateStr", it->getState().toString());
+ }
+
+ const unsigned int uptime = static_cast<unsigned int>((
+ it->getUpSince() != Date_t() ? durationCount<Seconds>(now - it->getUpSince()) : 0));
+ bb.append("uptime", uptime);
+ if (!itConfig.isArbiter()) {
+ BSONObjBuilder opTime(bb.subobjStart("optime"));
+ opTime.append("ts", it->getOpTime().getTimestamp());
+ opTime.append("term", it->getOpTime().getTerm());
+ opTime.done();
+
+ bb.appendDate("optimeDate",
+ Date_t::fromDurationSinceEpoch(Seconds(it->getOpTime().getSecs())));
+ }
+ bb.appendDate("lastHeartbeat", it->getLastHeartbeat());
+ bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv());
+ const int ping = _getPing(itConfig.getHostAndPort());
+ if (ping != -1) {
+ bb.append("pingMs", ping);
+ std::string s = it->getLastHeartbeatMsg();
+ if (!s.empty())
+ bb.append("lastHeartbeatMessage", s);
+ }
+ if (it->hasAuthIssue()) {
+ bb.append("authenticated", false);
+ }
+ const HostAndPort& syncSource = it->getSyncSource();
+ if (!syncSource.empty() && !state.primary()) {
+ bb.append("syncingTo", syncSource.toString());
}
- else {
- // add non-self member
- const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex);
- BSONObjBuilder bb;
- bb.append("_id", itConfig.getId());
- bb.append("name", itConfig.getHostAndPort().toString());
- double h = it->getHealth();
- bb.append("health", h);
- const MemberState state = it->getState();
- bb.append("state", static_cast<int>(state.s));
- if( h == 0 ) {
- // if we can't connect the state info is from the past
- // and could be confusing to show
- bb.append("stateStr", "(not reachable/healthy)");
- }
- else {
- bb.append("stateStr", it->getState().toString());
- }
-
- const unsigned int uptime = static_cast<unsigned int>(
- (it->getUpSince() != Date_t()?
- durationCount<Seconds>(now - it->getUpSince()) :
- 0));
- bb.append("uptime", uptime);
- if (!itConfig.isArbiter()) {
- BSONObjBuilder opTime(bb.subobjStart("optime"));
- opTime.append("ts", it->getOpTime().getTimestamp());
- opTime.append("term", it->getOpTime().getTerm());
- opTime.done();
-
- bb.appendDate("optimeDate",
- Date_t::fromDurationSinceEpoch(Seconds(it->getOpTime().getSecs())));
- }
- bb.appendDate("lastHeartbeat", it->getLastHeartbeat());
- bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv());
- const int ping = _getPing(itConfig.getHostAndPort());
- if (ping != -1) {
- bb.append("pingMs", ping);
- std::string s = it->getLastHeartbeatMsg();
- if( !s.empty() )
- bb.append("lastHeartbeatMessage", s);
- }
- if (it->hasAuthIssue()) {
- bb.append("authenticated", false);
- }
- const HostAndPort& syncSource = it->getSyncSource();
- if (!syncSource.empty() && !state.primary()) {
- bb.append("syncingTo", syncSource.toString());
- }
- if (state == MemberState::RS_PRIMARY) {
- bb.append("electionTime", it->getElectionTime());
- bb.appendDate("electionDate",
- Date_t::fromDurationSinceEpoch(
- Seconds(it->getElectionTime().getSecs())));
- }
- bb.appendIntOrLL("configVersion", it->getConfigVersion());
- membersOut.push_back(bb.obj());
+ if (state == MemberState::RS_PRIMARY) {
+ bb.append("electionTime", it->getElectionTime());
+ bb.appendDate(
+ "electionDate",
+ Date_t::fromDurationSinceEpoch(Seconds(it->getElectionTime().getSecs())));
}
+ bb.appendIntOrLL("configVersion", it->getConfigVersion());
+ membersOut.push_back(bb.obj());
}
+ }
- // sort members bson
- sort(membersOut.begin(), membersOut.end());
-
- response->append("set",
- _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : "");
- response->append("date", now);
- response->append("myState", myState.s);
+ // sort members bson
+ sort(membersOut.begin(), membersOut.end());
- // Add sync source info
- if (!_syncSource.empty() && !myState.primary() && !myState.removed()) {
- response->append("syncingTo", _syncSource.toString());
- }
+ response->append("set", _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : "");
+ response->append("date", now);
+ response->append("myState", myState.s);
- response->append("members", membersOut);
- *result = Status::OK();
+ // Add sync source info
+ if (!_syncSource.empty() && !myState.primary() && !myState.removed()) {
+ response->append("syncingTo", _syncSource.toString());
}
- void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) {
+ response->append("members", membersOut);
+ *result = Status::OK();
+}
- const MemberState myState = getMemberState();
- if (!_rsConfig.isInitialized() || myState.removed()) {
- response->markAsNoConfig();
- return;
- }
-
- response->setReplSetName(_rsConfig.getReplSetName());
- response->setReplSetVersion(_rsConfig.getConfigVersion());
- response->setIsMaster(myState.primary());
- response->setIsSecondary(myState.secondary());
+void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) {
+ const MemberState myState = getMemberState();
+ if (!_rsConfig.isInitialized() || myState.removed()) {
+ response->markAsNoConfig();
+ return;
+ }
- {
- for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin();
- it != _rsConfig.membersEnd(); ++it) {
- if (it->isHidden() || it->getSlaveDelay() > Seconds{0}) {
- continue;
- }
+ response->setReplSetName(_rsConfig.getReplSetName());
+ response->setReplSetVersion(_rsConfig.getConfigVersion());
+ response->setIsMaster(myState.primary());
+ response->setIsSecondary(myState.secondary());
- if (it->isElectable()) {
- response->addHost(it->getHostAndPort());
- }
- else if (it->isArbiter()) {
- response->addArbiter(it->getHostAndPort());
- }
- else {
- response->addPassive(it->getHostAndPort());
- }
+ {
+ for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin();
+ it != _rsConfig.membersEnd();
+ ++it) {
+ if (it->isHidden() || it->getSlaveDelay() > Seconds{0}) {
+ continue;
}
- }
-
- const MemberConfig* curPrimary = _currentPrimaryMember();
- if (curPrimary) {
- response->setPrimary(curPrimary->getHostAndPort());
- }
- const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
- if (selfConfig.isArbiter()) {
- response->setIsArbiterOnly(true);
- }
- else if (selfConfig.getPriority() == 0) {
- response->setIsPassive(true);
- }
- if (selfConfig.getSlaveDelay().count()) {
- response->setSlaveDelay(selfConfig.getSlaveDelay());
- }
- if (selfConfig.isHidden()) {
- response->setIsHidden(true);
- }
- if (!selfConfig.shouldBuildIndexes()) {
- response->setShouldBuildIndexes(false);
- }
- const ReplicaSetTagConfig tagConfig = _rsConfig.getTagConfig();
- if (selfConfig.hasTags(tagConfig)) {
- for (MemberConfig::TagIterator tag = selfConfig.tagsBegin();
- tag != selfConfig.tagsEnd(); ++tag) {
- std::string tagKey = tagConfig.getTagKey(*tag);
- if (tagKey[0] == '$') {
- // Filter out internal tags
- continue;
- }
- response->addTag(tagKey, tagConfig.getTagValue(*tag));
+ if (it->isElectable()) {
+ response->addHost(it->getHostAndPort());
+ } else if (it->isArbiter()) {
+ response->addArbiter(it->getHostAndPort());
+ } else {
+ response->addPassive(it->getHostAndPort());
}
}
- response->setMe(selfConfig.getHostAndPort());
- if (_iAmPrimary()) {
- response->setElectionId(_electionId);
- }
}
- void TopologyCoordinatorImpl::prepareFreezeResponse(
- Date_t now, int secs, BSONObjBuilder* response) {
-
- if (secs == 0) {
- _stepDownUntil = now;
- log() << "'unfreezing'";
- response->append("info", "unfreezing");
-
- if (_followerMode == MemberState::RS_SECONDARY &&
- _rsConfig.getNumMembers() == 1 &&
- _selfIndex == 0 &&
- _rsConfig.getMemberAt(_selfIndex).isElectable()) {
- // If we are a one-node replica set, we're the one member,
- // we're electable, and we are currently in followerMode SECONDARY,
- // we must transition to candidate now that our stepdown period
- // is no longer active, in leiu of heartbeats.
- _role = Role::candidate;
- }
- }
- else {
- if ( secs == 1 )
- response->append("warning", "you really want to freeze for only 1 second?");
+ const MemberConfig* curPrimary = _currentPrimaryMember();
+ if (curPrimary) {
+ response->setPrimary(curPrimary->getHostAndPort());
+ }
- if (!_iAmPrimary()) {
- _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs));
- log() << "'freezing' for " << secs << " seconds";
- }
- else {
- log() << "received freeze command but we are primary";
+ const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
+ if (selfConfig.isArbiter()) {
+ response->setIsArbiterOnly(true);
+ } else if (selfConfig.getPriority() == 0) {
+ response->setIsPassive(true);
+ }
+ if (selfConfig.getSlaveDelay().count()) {
+ response->setSlaveDelay(selfConfig.getSlaveDelay());
+ }
+ if (selfConfig.isHidden()) {
+ response->setIsHidden(true);
+ }
+ if (!selfConfig.shouldBuildIndexes()) {
+ response->setShouldBuildIndexes(false);
+ }
+ const ReplicaSetTagConfig tagConfig = _rsConfig.getTagConfig();
+ if (selfConfig.hasTags(tagConfig)) {
+ for (MemberConfig::TagIterator tag = selfConfig.tagsBegin(); tag != selfConfig.tagsEnd();
+ ++tag) {
+ std::string tagKey = tagConfig.getTagKey(*tag);
+ if (tagKey[0] == '$') {
+ // Filter out internal tags
+ continue;
}
+ response->addTag(tagKey, tagConfig.getTagValue(*tag));
}
}
+ response->setMe(selfConfig.getHostAndPort());
+ if (_iAmPrimary()) {
+ response->setElectionId(_electionId);
+ }
+}
- bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) {
- if (_stepDownUntil > now) {
- return false;
- }
+void TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now,
+ int secs,
+ BSONObjBuilder* response) {
+ if (secs == 0) {
+ _stepDownUntil = now;
+ log() << "'unfreezing'";
+ response->append("info", "unfreezing");
- if (_followerMode == MemberState::RS_SECONDARY &&
- _rsConfig.getNumMembers() == 1 &&
- _selfIndex == 0 &&
- _rsConfig.getMemberAt(_selfIndex).isElectable()) {
- // If the new config describes a one-node replica set, we're the one member,
+ if (_followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 &&
+ _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable()) {
+ // If we are a one-node replica set, we're the one member,
// we're electable, and we are currently in followerMode SECONDARY,
- // we must transition to candidate, in leiu of heartbeats.
+ // we must transition to candidate now that our stepdown period
+ // is no longer active, in leiu of heartbeats.
_role = Role::candidate;
- return true;
}
- return false;
- }
+ } else {
+ if (secs == 1)
+ response->append("warning", "you really want to freeze for only 1 second?");
- void TopologyCoordinatorImpl::setElectionSleepUntil(Date_t newTime) {
- if (_electionSleepUntil < newTime) {
- _electionSleepUntil = newTime;
+ if (!_iAmPrimary()) {
+ _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs));
+ log() << "'freezing' for " << secs << " seconds";
+ } else {
+ log() << "received freeze command but we are primary";
}
}
+}
- Timestamp TopologyCoordinatorImpl::getElectionTime() const {
- return _electionTime;
- }
-
- OID TopologyCoordinatorImpl::getElectionId() const {
- return _electionId;
- }
-
- int TopologyCoordinatorImpl::getCurrentPrimaryIndex() const {
- return _currentPrimaryIndex;
+bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) {
+ if (_stepDownUntil > now) {
+ return false;
}
- Date_t TopologyCoordinatorImpl::getStepDownTime() const {
- return _stepDownUntil;
+ if (_followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 &&
+ _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable()) {
+ // If the new config describes a one-node replica set, we're the one member,
+ // we're electable, and we are currently in followerMode SECONDARY,
+ // we must transition to candidate, in leiu of heartbeats.
+ _role = Role::candidate;
+ return true;
}
-
- void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplicaSetConfig& newConfig,
- int selfIndex,
- Date_t now) {
- std::vector<MemberHeartbeatData> oldHeartbeats;
- _hbdata.swap(oldHeartbeats);
-
- int index = 0;
- for (ReplicaSetConfig::MemberIterator it = newConfig.membersBegin();
- it != newConfig.membersEnd();
- ++it, ++index) {
- const MemberConfig& newMemberConfig = *it;
- // TODO: C++11: use emplace_back()
- if (index == selfIndex) {
- // Insert placeholder for ourself, though we will never consult it.
- _hbdata.push_back(MemberHeartbeatData());
- }
- else {
- MemberHeartbeatData newHeartbeatData;
- for (int oldIndex = 0; oldIndex < _rsConfig.getNumMembers(); ++oldIndex) {
- const MemberConfig& oldMemberConfig = _rsConfig.getMemberAt(oldIndex);
- if (oldMemberConfig.getId() == newMemberConfig.getId() &&
- oldMemberConfig.getHostAndPort() == newMemberConfig.getHostAndPort()) {
- // This member existed in the old config with the same member ID and
- // HostAndPort, so copy its heartbeat data over.
- newHeartbeatData = oldHeartbeats[oldIndex];
- break;
- }
+ return false;
+}
+
+void TopologyCoordinatorImpl::setElectionSleepUntil(Date_t newTime) {
+ if (_electionSleepUntil < newTime) {
+ _electionSleepUntil = newTime;
+ }
+}
+
+Timestamp TopologyCoordinatorImpl::getElectionTime() const {
+ return _electionTime;
+}
+
+OID TopologyCoordinatorImpl::getElectionId() const {
+ return _electionId;
+}
+
+int TopologyCoordinatorImpl::getCurrentPrimaryIndex() const {
+ return _currentPrimaryIndex;
+}
+
+Date_t TopologyCoordinatorImpl::getStepDownTime() const {
+ return _stepDownUntil;
+}
+
+void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplicaSetConfig& newConfig,
+ int selfIndex,
+ Date_t now) {
+ std::vector<MemberHeartbeatData> oldHeartbeats;
+ _hbdata.swap(oldHeartbeats);
+
+ int index = 0;
+ for (ReplicaSetConfig::MemberIterator it = newConfig.membersBegin();
+ it != newConfig.membersEnd();
+ ++it, ++index) {
+ const MemberConfig& newMemberConfig = *it;
+ // TODO: C++11: use emplace_back()
+ if (index == selfIndex) {
+ // Insert placeholder for ourself, though we will never consult it.
+ _hbdata.push_back(MemberHeartbeatData());
+ } else {
+ MemberHeartbeatData newHeartbeatData;
+ for (int oldIndex = 0; oldIndex < _rsConfig.getNumMembers(); ++oldIndex) {
+ const MemberConfig& oldMemberConfig = _rsConfig.getMemberAt(oldIndex);
+ if (oldMemberConfig.getId() == newMemberConfig.getId() &&
+ oldMemberConfig.getHostAndPort() == newMemberConfig.getHostAndPort()) {
+ // This member existed in the old config with the same member ID and
+ // HostAndPort, so copy its heartbeat data over.
+ newHeartbeatData = oldHeartbeats[oldIndex];
+ break;
}
- _hbdata.push_back(newHeartbeatData);
}
+ _hbdata.push_back(newHeartbeatData);
}
}
+}
- // This function installs a new config object and recreates MemberHeartbeatData objects
- // that reflect the new config.
- void TopologyCoordinatorImpl::updateConfig(const ReplicaSetConfig& newConfig,
- int selfIndex,
- Date_t now,
- const OpTime& lastOpApplied) {
- invariant(_role != Role::candidate);
- invariant(selfIndex < newConfig.getNumMembers());
+// This function installs a new config object and recreates MemberHeartbeatData objects
+// that reflect the new config.
+void TopologyCoordinatorImpl::updateConfig(const ReplicaSetConfig& newConfig,
+ int selfIndex,
+ Date_t now,
+ const OpTime& lastOpApplied) {
+ invariant(_role != Role::candidate);
+ invariant(selfIndex < newConfig.getNumMembers());
- _updateHeartbeatDataForReconfig(newConfig, selfIndex, now);
- _rsConfig = newConfig;
- _selfIndex = selfIndex;
- _forceSyncSourceIndex = -1;
+ _updateHeartbeatDataForReconfig(newConfig, selfIndex, now);
+ _rsConfig = newConfig;
+ _selfIndex = selfIndex;
+ _forceSyncSourceIndex = -1;
- if (_role == Role::leader) {
- if (_selfIndex == -1) {
- log() << "Could not remain primary because no longer a member of the replica set";
- }
- else if (!_selfConfig().isElectable()) {
- log() <<" Could not remain primary because no longer electable";
- }
- else {
- // Don't stepdown if you don't have to.
- _currentPrimaryIndex = _selfIndex;
- return;
- }
- _role = Role::follower;
+ if (_role == Role::leader) {
+ if (_selfIndex == -1) {
+ log() << "Could not remain primary because no longer a member of the replica set";
+ } else if (!_selfConfig().isElectable()) {
+ log() << " Could not remain primary because no longer electable";
+ } else {
+ // Don't stepdown if you don't have to.
+ _currentPrimaryIndex = _selfIndex;
+ return;
}
+ _role = Role::follower;
+ }
- // By this point we know we are in Role::follower
- _currentPrimaryIndex = -1; // force secondaries to re-detect who the primary is
- _stepDownPending = false;
-
- if (_followerMode == MemberState::RS_SECONDARY &&
- _rsConfig.getNumMembers() == 1 &&
- _selfIndex == 0 &&
- _rsConfig.getMemberAt(_selfIndex).isElectable()) {
- // If the new config describes a one-node replica set, we're the one member,
- // we're electable, and we are currently in followerMode SECONDARY,
- // we must transition to candidate, in leiu of heartbeats.
- _role = Role::candidate;
- }
+ // By this point we know we are in Role::follower
+ _currentPrimaryIndex = -1; // force secondaries to re-detect who the primary is
+ _stepDownPending = false;
+
+ if (_followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 &&
+ _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable()) {
+ // If the new config describes a one-node replica set, we're the one member,
+ // we're electable, and we are currently in followerMode SECONDARY,
+ // we must transition to candidate, in leiu of heartbeats.
+ _role = Role::candidate;
}
- std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const {
- // ignore messages over 2 minutes old
- if ((now - _hbmsgTime) > Seconds{120}) {
- return "";
- }
- return _hbmsg;
+}
+std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const {
+ // ignore messages over 2 minutes old
+ if ((now - _hbmsgTime) > Seconds{120}) {
+ return "";
}
+ return _hbmsg;
+}
- void TopologyCoordinatorImpl::setMyHeartbeatMessage(const Date_t now,
- const std::string& message) {
- _hbmsgTime = now;
- _hbmsg = message;
- }
+void TopologyCoordinatorImpl::setMyHeartbeatMessage(const Date_t now, const std::string& message) {
+ _hbmsgTime = now;
+ _hbmsg = message;
+}
+
+const MemberConfig& TopologyCoordinatorImpl::_selfConfig() const {
+ return _rsConfig.getMemberAt(_selfIndex);
+}
- const MemberConfig& TopologyCoordinatorImpl::_selfConfig() const {
- return _rsConfig.getMemberAt(_selfIndex);
+TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnelectableReason(
+ int index, const OpTime& lastOpApplied) const {
+ invariant(index != _selfIndex);
+ const MemberConfig& memberConfig = _rsConfig.getMemberAt(index);
+ const MemberHeartbeatData& hbData = _hbdata[index];
+ UnelectableReasonMask result = None;
+ if (memberConfig.isArbiter()) {
+ result |= ArbiterIAm;
+ }
+ if (memberConfig.getPriority() <= 0) {
+ result |= NoPriority;
+ }
+ if (hbData.getState() != MemberState::RS_SECONDARY) {
+ result |= NotSecondary;
+ }
+ if (!_isOpTimeCloseEnoughToLatestToElect(hbData.getOpTime(), lastOpApplied)) {
+ result |= NotCloseEnoughToLatestOptime;
+ }
+ if (hbData.up() && hbData.isUnelectable()) {
+ result |= RefusesToStand;
}
+ invariant(result || memberConfig.isElectable());
+ return result;
+}
- TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnelectableReason(
- int index,
- const OpTime& lastOpApplied) const {
- invariant(index != _selfIndex);
- const MemberConfig& memberConfig = _rsConfig.getMemberAt(index);
- const MemberHeartbeatData& hbData = _hbdata[index];
- UnelectableReasonMask result = None;
- if (memberConfig.isArbiter()) {
- result |= ArbiterIAm;
- }
- if (memberConfig.getPriority() <= 0) {
- result |= NoPriority;
- }
- if (hbData.getState() != MemberState::RS_SECONDARY) {
- result |= NotSecondary;
- }
- if (!_isOpTimeCloseEnoughToLatestToElect(hbData.getOpTime(), lastOpApplied)) {
- result |= NotCloseEnoughToLatestOptime;
- }
- if (hbData.up() && hbData.isUnelectable()) {
- result |= RefusesToStand;
- }
- invariant(result || memberConfig.isElectable());
+TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUnelectableReason(
+ const Date_t now, const OpTime& lastApplied) const {
+ UnelectableReasonMask result = None;
+ if (lastApplied.isNull()) {
+ result |= NoData;
+ }
+ if (!_aMajoritySeemsToBeUp()) {
+ result |= CannotSeeMajority;
+ }
+ if (_selfIndex == -1) {
+ result |= NotInitialized;
return result;
}
+ if (_selfConfig().isArbiter()) {
+ result |= ArbiterIAm;
+ }
+ if (_selfConfig().getPriority() <= 0) {
+ result |= NoPriority;
+ }
+ if (_stepDownUntil > now) {
+ result |= StepDownPeriodActive;
+ }
+ if (_voteLease.whoId != -1 && _voteLease.whoId != _rsConfig.getMemberAt(_selfIndex).getId() &&
+ _voteLease.when + VoteLease::leaseTime >= now) {
+ result |= VotedTooRecently;
+ }
- TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUnelectableReason(
- const Date_t now,
- const OpTime& lastApplied) const {
-
- UnelectableReasonMask result = None;
- if (lastApplied.isNull()) {
- result |= NoData;
- }
- if (!_aMajoritySeemsToBeUp()) {
- result |= CannotSeeMajority;
- }
- if (_selfIndex == -1) {
- result |= NotInitialized;
- return result;
- }
- if (_selfConfig().isArbiter()) {
- result |= ArbiterIAm;
- }
- if (_selfConfig().getPriority() <= 0) {
- result |= NoPriority;
- }
- if (_stepDownUntil > now) {
- result |= StepDownPeriodActive;
- }
- if (_voteLease.whoId != -1 &&
- _voteLease.whoId !=_rsConfig.getMemberAt(_selfIndex).getId() &&
- _voteLease.when + VoteLease::leaseTime >= now) {
- result |= VotedTooRecently;
- }
-
- // Cannot be electable unless secondary or already primary
- if (!getMemberState().secondary() && !_iAmPrimary()) {
- result |= NotSecondary;
- }
- if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied, lastApplied)) {
- result |= NotCloseEnoughToLatestOptime;
- }
- return result;
+ // Cannot be electable unless secondary or already primary
+ if (!getMemberState().secondary() && !_iAmPrimary()) {
+ result |= NotSecondary;
+ }
+ if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied, lastApplied)) {
+ result |= NotCloseEnoughToLatestOptime;
}
+ return result;
+}
- std::string TopologyCoordinatorImpl::_getUnelectableReasonString(
- const UnelectableReasonMask ur) const {
- invariant(ur);
- str::stream ss;
- bool hasWrittenToStream = false;
- if (ur & NoData) {
- ss << "node has no applied oplog entries";
- hasWrittenToStream = true;
- }
- if (ur & VotedTooRecently) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "I recently voted for " << _voteLease.whoHostAndPort.toString();
- }
- if (ur & CannotSeeMajority) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "I cannot see a majority";
- }
- if (ur & ArbiterIAm) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member is an arbiter";
+std::string TopologyCoordinatorImpl::_getUnelectableReasonString(
+ const UnelectableReasonMask ur) const {
+ invariant(ur);
+ str::stream ss;
+ bool hasWrittenToStream = false;
+ if (ur & NoData) {
+ ss << "node has no applied oplog entries";
+ hasWrittenToStream = true;
+ }
+ if (ur & VotedTooRecently) {
+ if (hasWrittenToStream) {
+ ss << "; ";
}
- if (ur & NoPriority) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member has zero priority";
+ hasWrittenToStream = true;
+ ss << "I recently voted for " << _voteLease.whoHostAndPort.toString();
+ }
+ if (ur & CannotSeeMajority) {
+ if (hasWrittenToStream) {
+ ss << "; ";
}
- if (ur & StepDownPeriodActive) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "I am still waiting for stepdown period to end at " <<
- dateToISOStringLocal(_stepDownUntil);
+ hasWrittenToStream = true;
+ ss << "I cannot see a majority";
+ }
+ if (ur & ArbiterIAm) {
+ if (hasWrittenToStream) {
+ ss << "; ";
}
- if (ur & NotSecondary) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member is not currently a secondary";
+ hasWrittenToStream = true;
+ ss << "member is an arbiter";
+ }
+ if (ur & NoPriority) {
+ if (hasWrittenToStream) {
+ ss << "; ";
}
- if (ur & NotCloseEnoughToLatestOptime) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "member is more than 10 seconds behind the most up-to-date member";
+ hasWrittenToStream = true;
+ ss << "member has zero priority";
+ }
+ if (ur & StepDownPeriodActive) {
+ if (hasWrittenToStream) {
+ ss << "; ";
}
- if (ur & NotInitialized) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "node is not a member of a valid replica set configuration";
+ hasWrittenToStream = true;
+ ss << "I am still waiting for stepdown period to end at "
+ << dateToISOStringLocal(_stepDownUntil);
+ }
+ if (ur & NotSecondary) {
+ if (hasWrittenToStream) {
+ ss << "; ";
}
- if (ur & RefusesToStand) {
- if (hasWrittenToStream) {
- ss << "; ";
- }
- hasWrittenToStream = true;
- ss << "most recent heartbeat indicates node will not stand for election";
+ hasWrittenToStream = true;
+ ss << "member is not currently a secondary";
+ }
+ if (ur & NotCloseEnoughToLatestOptime) {
+ if (hasWrittenToStream) {
+ ss << "; ";
}
- if (!hasWrittenToStream) {
- severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur);
- fassertFailed(26011);
+ hasWrittenToStream = true;
+ ss << "member is more than 10 seconds behind the most up-to-date member";
+ }
+ if (ur & NotInitialized) {
+ if (hasWrittenToStream) {
+ ss << "; ";
}
- ss << " (mask 0x" << integerToHex(ur) << ")";
- return ss;
+ hasWrittenToStream = true;
+ ss << "node is not a member of a valid replica set configuration";
}
-
- int TopologyCoordinatorImpl::_getPing(const HostAndPort& host) {
- return _pings[host].getMillis();
+ if (ur & RefusesToStand) {
+ if (hasWrittenToStream) {
+ ss << "; ";
+ }
+ hasWrittenToStream = true;
+ ss << "most recent heartbeat indicates node will not stand for election";
}
-
- void TopologyCoordinatorImpl::_setElectionTime(const Timestamp& newElectionTime) {
- _electionTime = newElectionTime;
+ if (!hasWrittenToStream) {
+ severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur);
+ fassertFailed(26011);
}
+ ss << " (mask 0x" << integerToHex(ur) << ")";
+ return ss;
+}
- int TopologyCoordinatorImpl::_getTotalPings() {
- PingMap::iterator it = _pings.begin();
- PingMap::iterator end = _pings.end();
- int totalPings = 0;
- while (it != end) {
- totalPings += it->second.getCount();
- it++;
- }
- return totalPings;
- }
+int TopologyCoordinatorImpl::_getPing(const HostAndPort& host) {
+ return _pings[host].getMillis();
+}
- std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const {
- std::vector<HostAndPort> upHosts;
- for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
- it != _hbdata.end();
- ++it) {
- const int itIndex = indexOfIterator(_hbdata, it);
- if (itIndex == _selfIndex) {
- continue; // skip ourselves
- }
- if (!it->maybeUp()) {
- continue; // skip DOWN nodes
- }
+void TopologyCoordinatorImpl::_setElectionTime(const Timestamp& newElectionTime) {
+ _electionTime = newElectionTime;
+}
- upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort());
- }
- return upHosts;
+int TopologyCoordinatorImpl::_getTotalPings() {
+ PingMap::iterator it = _pings.begin();
+ PingMap::iterator end = _pings.end();
+ int totalPings = 0;
+ while (it != end) {
+ totalPings += it->second.getCount();
+ it++;
}
+ return totalPings;
+}
- bool TopologyCoordinatorImpl::voteForMyself(Date_t now) {
- if (_role != Role::candidate) {
- return false;
+std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const {
+ std::vector<HostAndPort> upHosts;
+ for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin(); it != _hbdata.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_hbdata, it);
+ if (itIndex == _selfIndex) {
+ continue; // skip ourselves
}
- int selfId = _selfConfig().getId();
- if ((_voteLease.when + VoteLease::leaseTime >= now)
- && (_voteLease.whoId != selfId)) {
- log() << "not voting yea for " << selfId <<
- " voted for " << _voteLease.whoHostAndPort.toString() << ' ' <<
- durationCount<Seconds>(now - _voteLease.when) << " secs ago";
- return false;
+ if (!it->maybeUp()) {
+ continue; // skip DOWN nodes
}
- _voteLease.when = now;
- _voteLease.whoId = selfId;
- _voteLease.whoHostAndPort = _selfConfig().getHostAndPort();
- return true;
+
+ upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort());
}
+ return upHosts;
+}
- MemberState TopologyCoordinatorImpl::getMemberState() const {
- if (_selfIndex == -1) {
- if (_rsConfig.isInitialized()) {
- return MemberState::RS_REMOVED;
- }
- return MemberState::RS_STARTUP;
- }
- if (_role == Role::leader) {
- invariant(_currentPrimaryIndex == _selfIndex);
- return MemberState::RS_PRIMARY;
- }
- const MemberConfig& myConfig = _selfConfig();
- if (myConfig.isArbiter()) {
- return MemberState::RS_ARBITER;
- }
- if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_hbdata, _selfIndex)))
- && (_followerMode == MemberState::RS_SECONDARY)) {
- return MemberState::RS_RECOVERING;
- }
- return _followerMode;
+bool TopologyCoordinatorImpl::voteForMyself(Date_t now) {
+ if (_role != Role::candidate) {
+ return false;
}
+ int selfId = _selfConfig().getId();
+ if ((_voteLease.when + VoteLease::leaseTime >= now) && (_voteLease.whoId != selfId)) {
+ log() << "not voting yea for " << selfId << " voted for "
+ << _voteLease.whoHostAndPort.toString() << ' '
+ << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
+ return false;
+ }
+ _voteLease.when = now;
+ _voteLease.whoId = selfId;
+ _voteLease.whoHostAndPort = _selfConfig().getHostAndPort();
+ return true;
+}
- void TopologyCoordinatorImpl::processWinElection(
- OID electionId,
- Timestamp electionOpTime) {
- invariant(_role == Role::candidate);
- _electionTime = electionOpTime;
- _electionId = electionId;
- _role = Role::leader;
- _currentPrimaryIndex = _selfIndex;
- _syncSource = HostAndPort();
- _forceSyncSourceIndex = -1;
+MemberState TopologyCoordinatorImpl::getMemberState() const {
+ if (_selfIndex == -1) {
+ if (_rsConfig.isInitialized()) {
+ return MemberState::RS_REMOVED;
+ }
+ return MemberState::RS_STARTUP;
+ }
+ if (_role == Role::leader) {
+ invariant(_currentPrimaryIndex == _selfIndex);
+ return MemberState::RS_PRIMARY;
+ }
+ const MemberConfig& myConfig = _selfConfig();
+ if (myConfig.isArbiter()) {
+ return MemberState::RS_ARBITER;
}
+ if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_hbdata, _selfIndex))) &&
+ (_followerMode == MemberState::RS_SECONDARY)) {
+ return MemberState::RS_RECOVERING;
+ }
+ return _followerMode;
+}
- void TopologyCoordinatorImpl::processLoseElection() {
- invariant(_role == Role::candidate);
- const HostAndPort syncSourceAddress = getSyncSourceAddress();
- _electionTime = Timestamp(0, 0);
- _electionId = OID();
- _role = Role::follower;
+void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp electionOpTime) {
+ invariant(_role == Role::candidate);
+ _electionTime = electionOpTime;
+ _electionId = electionId;
+ _role = Role::leader;
+ _currentPrimaryIndex = _selfIndex;
+ _syncSource = HostAndPort();
+ _forceSyncSourceIndex = -1;
+}
- // Clear voteLease time, if we voted for ourselves in this election.
- // This will allow us to vote for others.
- if (_voteLease.whoId == _selfConfig().getId()) {
- _voteLease.when = Date_t();
- }
+void TopologyCoordinatorImpl::processLoseElection() {
+ invariant(_role == Role::candidate);
+ const HostAndPort syncSourceAddress = getSyncSourceAddress();
+ _electionTime = Timestamp(0, 0);
+ _electionId = OID();
+ _role = Role::follower;
+
+ // Clear voteLease time, if we voted for ourselves in this election.
+ // This will allow us to vote for others.
+ if (_voteLease.whoId == _selfConfig().getId()) {
+ _voteLease.when = Date_t();
}
+}
- bool TopologyCoordinatorImpl::stepDown(Date_t until, bool force, const OpTime& lastOpApplied) {
- bool canStepDown = force;
- for (int i = 0; !canStepDown && i < _rsConfig.getNumMembers(); ++i) {
- if (i == _selfIndex) {
- continue;
- }
- UnelectableReasonMask reason = _getUnelectableReason(i, lastOpApplied);
- if (!reason && _hbdata[i].getOpTime() >= lastOpApplied) {
- canStepDown = true;
- }
+bool TopologyCoordinatorImpl::stepDown(Date_t until, bool force, const OpTime& lastOpApplied) {
+ bool canStepDown = force;
+ for (int i = 0; !canStepDown && i < _rsConfig.getNumMembers(); ++i) {
+ if (i == _selfIndex) {
+ continue;
}
-
- if (!canStepDown) {
- return false;
+ UnelectableReasonMask reason = _getUnelectableReason(i, lastOpApplied);
+ if (!reason && _hbdata[i].getOpTime() >= lastOpApplied) {
+ canStepDown = true;
}
- _stepDownUntil = until;
- _stepDownSelfAndReplaceWith(-1);
- return true;
}
- void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) {
- invariant(_role == Role::follower);
- switch (newMode) {
+ if (!canStepDown) {
+ return false;
+ }
+ _stepDownUntil = until;
+ _stepDownSelfAndReplaceWith(-1);
+ return true;
+}
+
+void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) {
+ invariant(_role == Role::follower);
+ switch (newMode) {
case MemberState::RS_RECOVERING:
case MemberState::RS_ROLLBACK:
case MemberState::RS_SECONDARY:
@@ -2104,223 +1992,209 @@ namespace {
break;
default:
invariant(false);
- }
-
- if (_followerMode != MemberState::RS_SECONDARY) {
- return;
- }
-
- // When a single node replica set transitions to SECONDARY, we must check if we should
- // be a candidate here. This is necessary because a single node replica set has no
- // heartbeats that would normally change the role to candidate.
-
- if (_rsConfig.getNumMembers() == 1 &&
- _selfIndex == 0 &&
- _rsConfig.getMemberAt(_selfIndex).isElectable()) {
- _role = Role::candidate;
- }
}
- bool TopologyCoordinatorImpl::stepDownIfPending() {
- if (!_stepDownPending) {
- return false;
- }
-
- int remotePrimaryIndex = -1;
- for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
- it != _hbdata.end(); ++it) {
- const int itIndex = indexOfIterator(_hbdata, it);
- if (itIndex == _selfIndex) {
- continue;
- }
-
- if (it->getState().primary() && it->up()) {
- if (remotePrimaryIndex != -1) {
- // two other nodes think they are primary (asynchronously polled)
- // -- wait for things to settle down.
- remotePrimaryIndex = -1;
- warning() << "two remote primaries (transiently)";
- break;
- }
- remotePrimaryIndex = itIndex;
- }
- }
- _stepDownSelfAndReplaceWith(remotePrimaryIndex);
- return true;
- }
-
- void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) {
- invariant(_role == Role::leader);
- invariant(_selfIndex != -1);
- invariant(_selfIndex != newPrimary);
- invariant(_selfIndex == _currentPrimaryIndex);
- _currentPrimaryIndex = newPrimary;
- _role = Role::follower;
- _stepDownPending = false;
+ if (_followerMode != MemberState::RS_SECONDARY) {
+ return;
}
- void TopologyCoordinatorImpl::adjustMaintenanceCountBy(int inc) {
- invariant(_role == Role::follower);
- _maintenanceModeCalls += inc;
- invariant(_maintenanceModeCalls >= 0);
- }
-
- int TopologyCoordinatorImpl::getMaintenanceCount() const {
- return _maintenanceModeCalls;
- }
+ // When a single node replica set transitions to SECONDARY, we must check if we should
+ // be a candidate here. This is necessary because a single node replica set has no
+ // heartbeats that would normally change the role to candidate.
- bool TopologyCoordinatorImpl::updateTerm(long long term) {
- if (term <= _term) {
- return false;
- }
- _term = term;
- return true;
+ if (_rsConfig.getNumMembers() == 1 && _selfIndex == 0 &&
+ _rsConfig.getMemberAt(_selfIndex).isElectable()) {
+ _role = Role::candidate;
}
+}
- long long TopologyCoordinatorImpl::getTerm() const {
- return _term;
+bool TopologyCoordinatorImpl::stepDownIfPending() {
+ if (!_stepDownPending) {
+ return false;
}
- bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
- Date_t now) const {
- // Methodology:
- // If there exists a viable sync source member other than currentSource, whose oplog has
- // reached an optime greater than _maxSyncSourceLagSecs later than currentSource's, return
- // true.
-
- // If the user requested a sync source change, return true.
- if (_forceSyncSourceIndex != -1) {
- return true;
- }
-
- const int currentMemberIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource);
- if (currentMemberIndex == -1) {
- return true;
- }
- invariant(currentMemberIndex != _selfIndex);
-
- OpTime currentOpTime = _hbdata[currentMemberIndex].getOpTime();
- if (currentOpTime.isNull()) {
- // Haven't received a heartbeat from the sync source yet, so can't tell if we should
- // change.
- return false;
+ int remotePrimaryIndex = -1;
+ for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin(); it != _hbdata.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_hbdata, it);
+ if (itIndex == _selfIndex) {
+ continue;
}
- unsigned int currentSecs = currentOpTime.getSecs();
- unsigned int goalSecs = currentSecs + _maxSyncSourceLagSecs.count();
- for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin();
- it != _hbdata.end();
- ++it) {
- const int itIndex = indexOfIterator(_hbdata, it);
- const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex);
- if (it->up() &&
- (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) &&
- it->getState().readable() &&
- !_memberIsBlacklisted(candidateConfig, now) &&
- goalSecs < it->getOpTime().getSecs()) {
- log() << "changing sync target because current sync target's most recent OpTime is "
- << currentOpTime.toString() << " which is more than "
- << _maxSyncSourceLagSecs.count() << " seconds behind member "
- << candidateConfig.getHostAndPort().toString()
- << " whose most recent OpTime is " << it->getOpTime().toString();
- invariant(itIndex != _selfIndex);
- return true;
+ if (it->getState().primary() && it->up()) {
+ if (remotePrimaryIndex != -1) {
+ // two other nodes think they are primary (asynchronously polled)
+ // -- wait for things to settle down.
+ remotePrimaryIndex = -1;
+ warning() << "two remote primaries (transiently)";
+ break;
}
+ remotePrimaryIndex = itIndex;
}
- return false;
}
+ _stepDownSelfAndReplaceWith(remotePrimaryIndex);
+ return true;
+}
- void TopologyCoordinatorImpl::prepareCursorResponseInfo(
- BSONObjBuilder* objBuilder,
- const OpTime& lastCommittedOpTime) const {
- objBuilder->append("term", _term);
- objBuilder->append("lastOpCommittedTimestamp", lastCommittedOpTime.getTimestamp());
- objBuilder->append("lastOpCommittedTerm", lastCommittedOpTime.getTerm());
- objBuilder->append("configVersion", _rsConfig.getConfigVersion());
- objBuilder->append("primaryId", _rsConfig.getMemberAt(_currentPrimaryIndex).getId());
- }
+void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) {
+ invariant(_role == Role::leader);
+ invariant(_selfIndex != -1);
+ invariant(_selfIndex != newPrimary);
+ invariant(_selfIndex == _currentPrimaryIndex);
+ _currentPrimaryIndex = newPrimary;
+ _role = Role::follower;
+ _stepDownPending = false;
+}
- void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
- output->setConfig(_rsConfig);
- output->setHBData(_hbdata);
- output->setSelfIndex(_selfIndex);
- output->setPrimaryIndex(_currentPrimaryIndex);
- output->setSelfState(getMemberState());
- output->setSelfHeartbeatMessage(_hbmsg);
- }
+void TopologyCoordinatorImpl::adjustMaintenanceCountBy(int inc) {
+ invariant(_role == Role::follower);
+ _maintenanceModeCalls += inc;
+ invariant(_maintenanceModeCalls >= 0);
+}
- void TopologyCoordinatorImpl::processReplSetRequestVotes(
- const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response,
- const OpTime& lastAppliedOpTime) {
- response->setOk(true);
- response->setTerm(_term);
-
- if (args.getTerm() < _term) {
- response->setVoteGranted(false);
- response->setReason("candidate's term is lower than mine");
- }
- else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) {
- response->setVoteGranted(false);
- response->setReason("candidate's config version differs from mine");
- }
- else if (args.getSetName() != _rsConfig.getReplSetName()) {
- response->setVoteGranted(false);
- response->setReason("candidate's set name differs from mine");
- }
- else if (args.getLastCommittedOp() < lastAppliedOpTime) {
- response->setVoteGranted(false);
- response->setReason("candidate's data is staler than mine");
- }
- else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) {
- response->setVoteGranted(false);
- response->setReason("already voted for another candidate this term");
- }
- else {
- if (!args.isADryRun()) {
- _lastVote.setTerm(args.getTerm());
- _lastVote.setCandidateId(args.getCandidateId());
- }
- response->setVoteGranted(true);
- }
+int TopologyCoordinatorImpl::getMaintenanceCount() const {
+ return _maintenanceModeCalls;
+}
+bool TopologyCoordinatorImpl::updateTerm(long long term) {
+ if (term <= _term) {
+ return false;
}
+ _term = term;
+ return true;
+}
- Status TopologyCoordinatorImpl::processReplSetDeclareElectionWinner(
- const ReplSetDeclareElectionWinnerArgs& args,
- long long* responseTerm) {
- *responseTerm = _term;
- if (args.getReplSetName() != _rsConfig.getReplSetName()) {
- return {ErrorCodes::BadValue, "replSet name does not match"};
- }
- else if (args.getTerm() < _term) {
- return {ErrorCodes::BadValue, "term has already passed"};
- }
- else if (args.getTerm() == _term && _currentPrimaryIndex > -1 &&
- args.getWinnerId() != _rsConfig.getMemberAt(_currentPrimaryIndex).getId()) {
- return {ErrorCodes::BadValue, "term already has a primary"};
- }
+long long TopologyCoordinatorImpl::getTerm() const {
+ return _term;
+}
- _currentPrimaryIndex = _rsConfig.findMemberIndexByConfigId(args.getWinnerId());
- return Status::OK();
- }
+bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
+ Date_t now) const {
+ // Methodology:
+ // If there exists a viable sync source member other than currentSource, whose oplog has
+ // reached an optime greater than _maxSyncSourceLagSecs later than currentSource's, return
+ // true.
- void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) {
- _lastVote = lastVote;
+ // If the user requested a sync source change, return true.
+ if (_forceSyncSourceIndex != -1) {
+ return true;
}
- long long TopologyCoordinatorImpl::getTerm() {
- return _term;
+ const int currentMemberIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource);
+ if (currentMemberIndex == -1) {
+ return true;
}
+ invariant(currentMemberIndex != _selfIndex);
- void TopologyCoordinatorImpl::incrementTerm() {
- _term++;
+ OpTime currentOpTime = _hbdata[currentMemberIndex].getOpTime();
+ if (currentOpTime.isNull()) {
+ // Haven't received a heartbeat from the sync source yet, so can't tell if we should
+ // change.
+ return false;
}
-
- void TopologyCoordinatorImpl::voteForMyselfV1() {
- _lastVote.setTerm(_term);
- _lastVote.setCandidateId(_selfConfig().getId());
+ unsigned int currentSecs = currentOpTime.getSecs();
+ unsigned int goalSecs = currentSecs + _maxSyncSourceLagSecs.count();
+
+ for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin(); it != _hbdata.end();
+ ++it) {
+ const int itIndex = indexOfIterator(_hbdata, it);
+ const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex);
+ if (it->up() &&
+ (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) &&
+ it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) &&
+ goalSecs < it->getOpTime().getSecs()) {
+ log() << "changing sync target because current sync target's most recent OpTime is "
+ << currentOpTime.toString() << " which is more than "
+ << _maxSyncSourceLagSecs.count() << " seconds behind member "
+ << candidateConfig.getHostAndPort().toString() << " whose most recent OpTime is "
+ << it->getOpTime().toString();
+ invariant(itIndex != _selfIndex);
+ return true;
+ }
}
-
-} // namespace repl
-} // namespace mongo
+ return false;
+}
+
+void TopologyCoordinatorImpl::prepareCursorResponseInfo(BSONObjBuilder* objBuilder,
+ const OpTime& lastCommittedOpTime) const {
+ objBuilder->append("term", _term);
+ objBuilder->append("lastOpCommittedTimestamp", lastCommittedOpTime.getTimestamp());
+ objBuilder->append("lastOpCommittedTerm", lastCommittedOpTime.getTerm());
+ objBuilder->append("configVersion", _rsConfig.getConfigVersion());
+ objBuilder->append("primaryId", _rsConfig.getMemberAt(_currentPrimaryIndex).getId());
+}
+
+void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
+ output->setConfig(_rsConfig);
+ output->setHBData(_hbdata);
+ output->setSelfIndex(_selfIndex);
+ output->setPrimaryIndex(_currentPrimaryIndex);
+ output->setSelfState(getMemberState());
+ output->setSelfHeartbeatMessage(_hbmsg);
+}
+
+void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
+ ReplSetRequestVotesResponse* response,
+ const OpTime& lastAppliedOpTime) {
+ response->setOk(true);
+ response->setTerm(_term);
+
+ if (args.getTerm() < _term) {
+ response->setVoteGranted(false);
+ response->setReason("candidate's term is lower than mine");
+ } else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) {
+ response->setVoteGranted(false);
+ response->setReason("candidate's config version differs from mine");
+ } else if (args.getSetName() != _rsConfig.getReplSetName()) {
+ response->setVoteGranted(false);
+ response->setReason("candidate's set name differs from mine");
+ } else if (args.getLastCommittedOp() < lastAppliedOpTime) {
+ response->setVoteGranted(false);
+ response->setReason("candidate's data is staler than mine");
+ } else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) {
+ response->setVoteGranted(false);
+ response->setReason("already voted for another candidate this term");
+ } else {
+ if (!args.isADryRun()) {
+ _lastVote.setTerm(args.getTerm());
+ _lastVote.setCandidateId(args.getCandidateId());
+ }
+ response->setVoteGranted(true);
+ }
+}
+
+Status TopologyCoordinatorImpl::processReplSetDeclareElectionWinner(
+ const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm) {
+ *responseTerm = _term;
+ if (args.getReplSetName() != _rsConfig.getReplSetName()) {
+ return {ErrorCodes::BadValue, "replSet name does not match"};
+ } else if (args.getTerm() < _term) {
+ return {ErrorCodes::BadValue, "term has already passed"};
+ } else if (args.getTerm() == _term && _currentPrimaryIndex > -1 &&
+ args.getWinnerId() != _rsConfig.getMemberAt(_currentPrimaryIndex).getId()) {
+ return {ErrorCodes::BadValue, "term already has a primary"};
+ }
+
+ _currentPrimaryIndex = _rsConfig.findMemberIndexByConfigId(args.getWinnerId());
+ return Status::OK();
+}
+
+void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) {
+ _lastVote = lastVote;
+}
+
+long long TopologyCoordinatorImpl::getTerm() {
+ return _term;
+}
+
+void TopologyCoordinatorImpl::incrementTerm() {
+ _term++;
+}
+
+void TopologyCoordinatorImpl::voteForMyselfV1() {
+ _lastVote.setTerm(_term);
+ _lastVote.setCandidateId(_selfConfig().getId());
+}
+
+} // namespace repl
+} // namespace mongo