summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/topology_coordinator.h
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-11-27 10:32:07 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2017-11-28 15:59:13 -0500
commit26b42901e302db19cfbdbf3726b7be6fb624f4f5 (patch)
tree3e1a3e8f57c584ea3131b6fda11d84531b3077eb /src/mongo/db/repl/topology_coordinator.h
parente38cf83f058f2e29e41c4fe2f02b7fbc405ceb8c (diff)
downloadmongo-26b42901e302db19cfbdbf3726b7be6fb624f4f5.tar.gz
SERVER-30626 Remove TopologyCoordinator interface.
Diffstat (limited to 'src/mongo/db/repl/topology_coordinator.h')
-rw-r--r--src/mongo/db/repl/topology_coordinator.h931
1 files changed, 751 insertions, 180 deletions
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 046ff2217a6..eca071e590a 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -28,241 +28,735 @@
#pragma once
+#include <iosfwd>
#include <string>
-#include <vector>
-#include "mongo/bson/timestamp.h"
+#include "mongo/base/disallow_copying.h"
#include "mongo/db/repl/last_vote.h"
-#include "mongo/db/repl/member_data.h"
-#include "mongo/db/repl/member_state.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/repl_set_config.h"
+#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/server_options.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
namespace mongo {
-
-class OperationContext;
-
-namespace rpc {
-class ReplSetMetadata;
-} // namespace rpc
+class Timestamp;
namespace repl {
-
-static const Milliseconds UninitializedPing{-1};
+class HeartbeatResponseAction;
+class MemberData;
+class OpTime;
+class ReplSetHeartbeatArgs;
+class ReplSetConfig;
+class TagSubgroup;
+struct MemberState;
/**
- * Represents a latency measurement for each replica set member based on heartbeat requests.
- * The measurement is an average weighted 80% to the old value, and 20% to the new value.
+ * Replication Topology Coordinator
*
- * Also stores information about heartbeat progress and retries.
+ * This object is responsible for managing the topology of the cluster.
+ * Tasks include consensus and leader election, chaining, and configuration management.
+ * Methods of this class should be non-blocking.
*/
-class PingStats {
+class TopologyCoordinator {
+ MONGO_DISALLOW_COPYING(TopologyCoordinator);
+
public:
/**
- * Records that a new heartbeat request started at "now".
+ * Type that denotes the role of a node in the replication protocol.
*
- * This resets the failure count used in determining whether the next request to a target
- * should be a retry or a regularly scheduled heartbeat message.
+ * The role is distinct from MemberState, in that it only deals with the
+ * roles a node plays in the basic protocol -- leader, follower and candidate.
+ * The mapping between MemberState and Role is complex -- several MemberStates
+ * map to the follower role, and MemberState::RS_SECONDARY maps to either
+ * follower or candidate roles, e.g.
*/
- void start(Date_t now);
+ class Role {
+ public:
+ /**
+ * Constant indicating leader role.
+ */
+ static const Role leader;
+
+ /**
+ * Constant indicating follower role.
+ */
+ static const Role follower;
+
+ /**
+ * Constant indicating candidate role
+ */
+ static const Role candidate;
+
+ Role() {}
+
+ bool operator==(Role other) const {
+ return _value == other._value;
+ }
+ bool operator!=(Role other) const {
+ return _value != other._value;
+ }
+
+ std::string toString() const;
+
+ private:
+ explicit Role(int value);
+
+ int _value;
+ };
+
+ struct Options {
+ // A sync source is re-evaluated after it lags behind further than this amount.
+ Seconds maxSyncSourceLagSecs{0};
+
+ // Whether or not this node is running as a config server.
+ ClusterRole clusterRole{ClusterRole::None};
+ };
/**
- * Records that a heartbeat request completed successfully, and that "millis" milliseconds
- * were spent for a single network roundtrip plus remote processing time.
+ * Constructs a Topology Coordinator object.
+ **/
+ TopologyCoordinator(Options options);
+
+
+ ~TopologyCoordinator();
+
+ /**
+ * Different modes a node can be in while still reporting itself as in state PRIMARY.
+ *
+ * Valid transitions:
+ *
+ * kNotLeader <----------------------------------
+ * | |
+ * | |
+ * | |
+ * v |
+ * kLeaderElect----- |
+ * | | |
+ * | | |
+ * v | |
+ * kMaster ------------------------- |
+ * | ^ | | |
+ * | | ------------------- | |
+ * | | | | | |
+ * v | v v v |
+ * kAttemptingStepDown----------->kSteppingDown |
+ * | | |
+ * | | |
+ * | | |
+ * ---------------------------------------------
+ *
*/
- void hit(Milliseconds millis);
+ enum class LeaderMode {
+ kNotLeader, // This node is not currently a leader.
+ kLeaderElect, // This node has been elected leader, but can't yet accept writes.
+ kMaster, // This node reports ismaster:true and can accept writes.
+ kSteppingDown, // This node is in the middle of a (hb) stepdown that must complete.
+ kAttemptingStepDown, // This node is in the middle of a stepdown (cmd) that might fail.
+ };
+
+ ////////////////////////////////////////////////////////////
+ //
+ // State inspection methods.
+ //
+ ////////////////////////////////////////////////////////////
/**
- * Records that a heartbeat request failed.
+ * Gets the role of this member in the replication protocol.
*/
- void miss();
+ Role getRole() const;
/**
- * Gets the number of hit() calls.
+ * Gets the MemberState of this member in the replica set.
*/
- unsigned int getCount() const {
- return count;
- }
+ MemberState getMemberState() const;
/**
- * Gets the weighted average round trip time for heartbeat messages to the target.
- * Returns 0 if there have been no pings recorded yet.
+ * Returns whether this node should be allowed to accept writes.
*/
- Milliseconds getMillis() const {
- return value == UninitializedPing ? Milliseconds(0) : value;
- }
+ bool canAcceptWrites() const;
/**
- * Gets the date at which start() was last called, which is used to determine if
- * a heartbeat should be retried or if the time limit has expired.
+ * Returns true if this node is in the process of stepping down. Note that this can be
+ * due to an unconditional stepdown that must succeed (for instance from learning about a new
+ * term) or due to a stepdown attempt that could fail (for instance from a stepdown cmd that
+ * could fail if not enough nodes are caught up).
*/
- Date_t getLastHeartbeatStartDate() const {
- return _lastHeartbeatStartDate;
- }
+ bool isSteppingDown() const;
/**
- * Gets the number of failures since start() was last called.
+ * Returns the address of the current sync source, or an empty HostAndPort if there is no
+ * current sync source.
+ */
+ HostAndPort getSyncSourceAddress() const;
+
+ /**
+ * Retrieves a vector of HostAndPorts containing all nodes that are neither DOWN nor
+ * ourself.
+ */
+ std::vector<HostAndPort> getMaybeUpHostAndPorts() const;
+
+ /**
+ * Gets the earliest time the current node will stand for election.
+ */
+ Date_t getStepDownTime() const;
+
+ /**
+ * Gets the current value of the maintenance mode counter.
+ */
+ int getMaintenanceCount() const;
+
+ /**
+ * Gets the latest term this member is aware of. If this member is the primary,
+ * it's the current term of the replica set.
+ */
+ long long getTerm() const;
+
+ enum class UpdateTermResult { kAlreadyUpToDate, kTriggerStepDown, kUpdatedTerm };
+
+ ////////////////////////////////////////////////////////////
+ //
+ // Basic state manipulation methods.
+ //
+ ////////////////////////////////////////////////////////////
+
+ /**
+ * Sets the latest term this member is aware of to the higher of its current value and
+ * the value passed in as "term".
+ * Returns the result of setting the term value, or if a stepdown should be triggered.
+ */
+ UpdateTermResult updateTerm(long long term, Date_t now);
+
+ /**
+ * Sets the index into the config used when we next choose a sync source
+ */
+ void setForceSyncSourceIndex(int index);
+
+ enum class ChainingPreference { kAllowChaining, kUseConfiguration };
+
+ /**
+ * Chooses and sets a new sync source, based on our current knowledge of the world.
+ */
+ HostAndPort chooseNewSyncSource(Date_t now,
+ const OpTime& lastOpTimeFetched,
+ ChainingPreference chainingPreference);
+
+ /**
+ * Suppresses selecting "host" as sync source until "until".
+ */
+ void blacklistSyncSource(const HostAndPort& host, Date_t until);
+
+ /**
+ * Removes a single entry "host" from the list of potential sync sources which we
+ * have blacklisted, if it is supposed to be unblacklisted by "now".
+ */
+ void unblacklistSyncSource(const HostAndPort& host, Date_t now);
+
+ /**
+ * Clears the list of potential sync sources we have blacklisted.
+ */
+ void clearSyncSourceBlacklist();
+
+ /**
+ * Determines if a new sync source should be chosen, if a better candidate sync source is
+ * available. If the current sync source's last optime ("syncSourceLastOpTime" under
+ * protocolVersion 1, but pulled from the MemberData in protocolVersion 0) is more than
+ * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are
+ * running in ProtocolVersion 1, our current sync source is not primary, has no sync source
+ * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true.
*
- * This value is incremented by calls to miss(), cleared by calls to start() and
- * set to the maximum possible value by calls to hit().
+ * "now" is used to skip over currently blacklisted sync sources.
+ *
+ * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8.
*/
- int getNumFailuresSinceLastStart() const {
- return _numFailuresSinceLastStart;
- }
+ bool shouldChangeSyncSource(const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata,
+ Date_t now) const;
-private:
- unsigned int count = 0;
- Milliseconds value = UninitializedPing;
- Date_t _lastHeartbeatStartDate;
- int _numFailuresSinceLastStart = std::numeric_limits<int>::max();
-};
+ /**
+ * Checks whether we are a single node set and we are not in a stepdown period. If so,
+ * puts us into candidate mode, otherwise does nothing. This is used to ensure that
+ * nodes in a single node replset become primary again when their stepdown period ends.
+ */
+ bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
-class TopologyCoordinatorImpl : public TopologyCoordinator {
-public:
- struct Options {
- // A sync source is re-evaluated after it lags behind further than this amount.
- Seconds maxSyncSourceLagSecs{0};
+ /**
+ * Sets the earliest time the current node will stand for election to "newTime".
+ *
+ * Until this time, while the node may report itself as electable, it will not stand
+ * for election.
+ */
+ void setElectionSleepUntil(Date_t newTime);
- // Whether or not this node is running as a config server.
- ClusterRole clusterRole{ClusterRole::None};
- };
+ /**
+ * Sets the reported mode of this node to one of RS_SECONDARY, RS_STARTUP2, RS_ROLLBACK or
+ * RS_RECOVERING, when getRole() == Role::follower. This is the interface by which the
+ * applier changes the reported member state of the current node, and enables or suppresses
+ * electability of the current node. All modes but RS_SECONDARY indicate an unelectable
+ * follower state (one that cannot transition to candidate).
+ */
+ void setFollowerMode(MemberState::MS newMode);
/**
- * Constructs a Topology Coordinator object.
- **/
- TopologyCoordinatorImpl(Options options);
+ * Scan the memberData and determine the highest last applied or last
+ * durable optime present on a majority of servers; set _lastCommittedOpTime to this
+ * new entry.
+ * Whether the last applied or last durable op time is used depends on whether
+ * the config getWriteConcernMajorityShouldJournal is set.
+ * Returns true if the _lastCommittedOpTime was changed.
+ */
+ bool updateLastCommittedOpTime();
+
+ /**
+ * Updates _lastCommittedOpTime to be "committedOpTime" if it is more recent than the
+ * current last committed OpTime. Returns true if _lastCommittedOpTime is changed.
+ */
+ bool advanceLastCommittedOpTime(const OpTime& committedOpTime);
+
+ /**
+ * Returns the OpTime of the latest majority-committed op known to this server.
+ */
+ OpTime getLastCommittedOpTime() const;
+
+ /**
+ * Returns true if it's safe to transition to LeaderMode::kMaster.
+ */
+ bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const;
+
+ /**
+ * Called by the ReplicationCoordinator to signal that we have finished catchup and drain modes
+ * and are ready to fully become primary and start accepting writes.
+ * "firstOpTimeOfTerm" is a floor on the OpTimes this node will be allowed to consider committed
+ * for this tenure as primary. This prevents entries from before our election from counting as
+ * committed in our view, until our election (the "firstOpTimeOfTerm" op) has been committed.
+ * Returns PrimarySteppedDown if this node is no longer eligible to begin accepting writes.
+ */
+ Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm);
+
+ /**
+ * Adjusts the maintenance mode count by "inc".
+ *
+ * It is an error to call this method if getRole() does not return Role::follower.
+ * It is an error to allow the maintenance count to go negative.
+ */
+ void adjustMaintenanceCountBy(int inc);
////////////////////////////////////////////////////////////
//
- // Implementation of TopologyCoordinator interface
+ // Methods that prepare responses to command requests.
//
////////////////////////////////////////////////////////////
- virtual Role getRole() const;
- virtual MemberState getMemberState() const;
- virtual bool canAcceptWrites() const override;
- virtual bool isSteppingDown() const override;
- virtual HostAndPort getSyncSourceAddress() const;
- virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const;
- virtual int getMaintenanceCount() const;
- virtual long long getTerm();
- virtual UpdateTermResult updateTerm(long long term, Date_t now);
- virtual void setForceSyncSourceIndex(int index);
- virtual HostAndPort chooseNewSyncSource(Date_t now,
- const OpTime& lastOpTimeFetched,
- ChainingPreference chainingPreference) override;
- virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
- virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now);
- virtual void clearSyncSourceBlacklist();
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- boost::optional<rpc::OplogQueryMetadata> oqMetadata,
- Date_t now) const;
- virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
- virtual void setElectionSleepUntil(Date_t newTime);
- virtual void setFollowerMode(MemberState::MS newMode);
- virtual bool updateLastCommittedOpTime();
- virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime);
- virtual OpTime getLastCommittedOpTime() const;
- virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override;
- virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) override;
- virtual void adjustMaintenanceCountBy(int inc);
- virtual void prepareSyncFromResponse(const HostAndPort& target,
- BSONObjBuilder* response,
- Status* result);
- virtual void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args,
- Date_t now,
- BSONObjBuilder* response,
- Status* result);
- virtual void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args,
- Date_t now,
- BSONObjBuilder* response,
- Status* result);
- virtual Status prepareHeartbeatResponse(Date_t now,
- const ReplSetHeartbeatArgs& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response);
- virtual Status prepareHeartbeatResponseV1(Date_t now,
- const ReplSetHeartbeatArgsV1& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response);
- virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
- BSONObjBuilder* response,
- Status* result);
- virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand(
+ // produces a reply to a replSetSyncFrom command
+ void prepareSyncFromResponse(const HostAndPort& target,
+ BSONObjBuilder* response,
+ Status* result);
+
+ // produce a reply to a replSetFresh command
+ void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args,
+ Date_t now,
+ BSONObjBuilder* response,
+ Status* result);
+
+ // produce a reply to a received electCmd
+ void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args,
+ Date_t now,
+ BSONObjBuilder* response,
+ Status* result);
+
+ // produce a reply to a heartbeat
+ Status prepareHeartbeatResponse(Date_t now,
+ const ReplSetHeartbeatArgs& args,
+ const std::string& ourSetName,
+ ReplSetHeartbeatResponse* response);
+
+ // produce a reply to a V1 heartbeat
+ Status prepareHeartbeatResponseV1(Date_t now,
+ const ReplSetHeartbeatArgsV1& args,
+ const std::string& ourSetName,
+ ReplSetHeartbeatResponse* response);
+
+ struct ReplSetStatusArgs {
+ Date_t now;
+ unsigned selfUptime;
+ const OpTime& readConcernMajorityOpTime;
+ const BSONObj& initialSyncStatus;
+ };
+
+ // produce a reply to a status request
+ void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
+ BSONObjBuilder* response,
+ Status* result);
+
+ // Produce a replSetUpdatePosition command to be sent to the node's sync source.
+ StatusWith<BSONObj> prepareReplSetUpdatePositionCommand(
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle,
OpTime currentCommittedSnapshotOpTime) const;
- virtual void fillIsMasterForReplSet(IsMasterResponse* response);
- virtual void fillMemberData(BSONObjBuilder* result);
- virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now,
- int secs,
- BSONObjBuilder* response);
- virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now);
- virtual std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest(
+ // produce a reply to an ismaster request. It is only valid to call this if we are a
+ // replset.
+ void fillIsMasterForReplSet(IsMasterResponse* response);
+
+ // Produce member data for the serverStatus command and diagnostic logging.
+ void fillMemberData(BSONObjBuilder* result);
+
+ enum class PrepareFreezeResponseResult { kNoAction, kElectSelf };
+
+ /**
+ * Produce a reply to a freeze request. Returns a PostMemberStateUpdateAction on success that
+ * may trigger state changes in the caller.
+ */
+ StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now,
+ int secs,
+ BSONObjBuilder* response);
+
+ ////////////////////////////////////////////////////////////
+ //
+ // Methods for sending and receiving heartbeats,
+ // reconfiguring and handling the results of standing for
+ // election.
+ //
+ ////////////////////////////////////////////////////////////
+
+ /**
+ * Updates the topology coordinator's notion of the replica set configuration.
+ *
+ * "newConfig" is the new configuration, and "selfIndex" is the index of this
+ * node's configuration information in "newConfig", or "selfIndex" is -1 to
+ * indicate that this node is not a member of "newConfig".
+ *
+ * newConfig.isInitialized() should be true, though implementations may accept
+ * configurations where this is not true, for testing purposes.
+ */
+ void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now);
+
+ /**
+ * Prepares a heartbeat request appropriate for sending to "target", assuming the
+ * current time is "now". "ourSetName" is used as the name for our replica set if
+ * the topology coordinator does not have a valid configuration installed.
+ *
+ * The returned pair contains proper arguments for a replSetHeartbeat command, and
+ * an amount of time to wait for the response.
+ *
+ * This call should be paired (with intervening network communication) with a call to
+ * processHeartbeatResponse for the same "target".
+ */
+ std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest(
Date_t now, const std::string& ourSetName, const HostAndPort& target);
- virtual std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1(
+ std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1(
Date_t now, const std::string& ourSetName, const HostAndPort& target);
- virtual HeartbeatResponseAction processHeartbeatResponse(
+
+ /**
+ * Processes a heartbeat response from "target" that arrived around "now", having
+ * spent "networkRoundTripTime" millis on the network.
+ *
+ * Updates internal topology coordinator state, and returns instructions about what action
+ * to take next.
+ *
+ * If the next action indicates StartElection, the topology coordinator has transitioned to
+ * the "candidate" role, and will remain there until processWinElection or
+ * processLoseElection are called.
+ *
+ * If the next action indicates "StepDownSelf", the topology coordinator has transitioned
+ * to the "follower" role from "leader", and the caller should take any necessary actions
+ * to become a follower.
+ *
+ * If the next action indicates "StepDownRemotePrimary", the caller should take steps to
+ * cause the specified remote host to step down from primary to secondary.
+ *
+ * If the next action indicates "Reconfig", the caller should verify the configuration in
+ * hbResponse is acceptable, perform any other reconfiguration actions it must, and call
+ * updateConfig with the new configuration and the appropriate value for "selfIndex". It
+ * must also wrap up any outstanding elections (by calling processLoseElection or
+ * processWinElection) before calling updateConfig.
+ *
+ * This call should be paired (with intervening network communication) with a call to
+ * prepareHeartbeatRequest for the same "target".
+ */
+ HeartbeatResponseAction processHeartbeatResponse(
Date_t now,
Milliseconds networkRoundTripTime,
const HostAndPort& target,
const StatusWith<ReplSetHeartbeatResponse>& hbResponse);
- virtual bool voteForMyself(Date_t now);
- virtual void setElectionInfo(OID electionId, Timestamp electionOpTime);
- virtual void processWinElection(OID electionId, Timestamp electionOpTime);
- virtual void processLoseElection();
- virtual Status checkShouldStandForElection(Date_t now) const;
- virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message);
- virtual bool attemptStepDown(long long termAtStart,
- Date_t now,
- Date_t waitUntil,
- Date_t stepDownUntil,
- bool force) override;
- virtual bool isSafeToStepDown() override;
- virtual bool prepareForUnconditionalStepDown() override;
- virtual Status prepareForStepDownAttempt() override;
- virtual void abortAttemptedStepDownIfNeeded() override;
- virtual void finishUnconditionalStepDown() override;
- virtual Date_t getStepDownTime() const;
- virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const;
- virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const;
- virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response);
- virtual void summarizeAsHtml(ReplSetHtmlSummary* output);
- virtual void loadLastVote(const LastVote& lastVote);
- virtual void voteForMyselfV1();
- virtual void setPrimaryIndex(long long primaryIndex);
- virtual int getCurrentPrimaryIndex() const;
- virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten);
- virtual bool haveTaggedNodesReachedOpTime(const OpTime& opTime,
- const ReplSetTagPattern& tagPattern,
- bool durablyWritten);
- virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op,
- bool durablyWritten,
- bool skipSelf);
- virtual bool setMemberAsDown(Date_t now, const int memberIndex) override;
- virtual std::pair<int, Date_t> getStalestLiveMember() const;
- virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now);
- virtual void resetAllMemberTimeouts(Date_t now);
- virtual void resetMemberTimeouts(Date_t now,
- const stdx::unordered_set<HostAndPort>& member_set);
- virtual OpTime getMyLastAppliedOpTime() const;
- virtual OpTime getMyLastDurableOpTime() const;
- virtual MemberData* getMyMemberData();
- virtual MemberData* findMemberDataByMemberId(const int memberId);
- virtual MemberData* findMemberDataByRid(const OID rid);
- virtual MemberData* addSlaveMemberData(const OID rid);
- virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason);
- virtual void setStorageEngineSupportsReadCommitted(bool supported);
-
- virtual void restartHeartbeats();
-
- virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const;
+
+ /**
+ * Returns whether or not at least 'numNodes' have reached the given opTime.
+ * "durablyWritten" indicates whether the operation has to be durably applied.
+ */
+ bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten);
+
+ /**
+ * Returns whether or not at least one node matching the tagPattern has reached
+ * the given opTime.
+ * "durablyWritten" indicates whether the operation has to be durably applied.
+ */
+ bool haveTaggedNodesReachedOpTime(const OpTime& opTime,
+ const ReplSetTagPattern& tagPattern,
+ bool durablyWritten);
+
+ /**
+ * Returns a vector of members that have applied the operation with OpTime 'op'.
+ * "durablyWritten" indicates whether the operation has to be durably applied.
+ * "skipSelf" means to exclude this node whether or not the op has been applied.
+ */
+ std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op,
+ bool durablyWritten,
+ bool skipSelf);
+
+ /**
+ * Marks a member as down from our perspective and returns a bool which indicates if we can no
+ * longer see a majority of the nodes and thus should step down.
+ */
+ bool setMemberAsDown(Date_t now, const int memberIndex);
+
+ /**
+ * Goes through the memberData and determines which member that is currently live
+ * has the stalest (earliest) last update time. Returns (-1, Date_t::max()) if there are
+ * no other members.
+ */
+ std::pair<int, Date_t> getStalestLiveMember() const;
+
+ /**
+ * Go through the memberData, and mark nodes which haven't been updated
+ * recently (within an election timeout) as "down". Returns a HeartbeatResponseAction, which
+ * will be StepDownSelf if we can no longer see a majority of the nodes, otherwise NoAction.
+ */
+ HeartbeatResponseAction checkMemberTimeouts(Date_t now);
+
+ /**
+ * Set all nodes in memberData to not stale with a lastUpdate of "now".
+ */
+ void resetAllMemberTimeouts(Date_t now);
+
+ /**
+ * Set all nodes in memberData that are present in member_set
+ * to not stale with a lastUpdate of "now".
+ */
+ void resetMemberTimeouts(Date_t now, const stdx::unordered_set<HostAndPort>& member_set);
+
+ /*
+ * Returns the last optime that this node has applied, whether or not it has been journaled.
+ */
+ OpTime getMyLastAppliedOpTime() const;
+
+ /*
+ * Returns the last optime that this node has applied and journaled.
+ */
+ OpTime getMyLastDurableOpTime() const;
+
+ /*
+ * Returns information we have on the state of this node.
+ */
+ MemberData* getMyMemberData();
+
+ /*
+ * Returns information we have on the state of the node identified by memberId. Returns
+ * nullptr if memberId is not found in the configuration.
+ */
+ MemberData* findMemberDataByMemberId(const int memberId);
+
+ /*
+ * Returns information we have on the state of the node identified by rid. Returns
+ * nullptr if rid is not found in the heartbeat data. This method is used only for
+ * master/slave replication.
+ */
+ MemberData* findMemberDataByRid(const OID rid);
+
+ /*
+ * Adds and returns a memberData entry for the given RID.
+ * Used only in master/slave mode.
+ */
+ MemberData* addSlaveMemberData(const OID rid);
+
+ /**
+ * If getRole() == Role::candidate and this node has not voted too recently, updates the
+ * lastVote tracker and returns true. Otherwise, returns false.
+ */
+ bool voteForMyself(Date_t now);
+
+ /**
+ * Sets lastVote to be for ourself in this term.
+ */
+ void voteForMyselfV1();
+
+ /**
+ * Sets election id and election optime.
+ */
+ void setElectionInfo(OID electionId, Timestamp electionOpTime);
+
+ /**
+ * Performs state updates associated with winning an election.
+ *
+ * It is an error to call this if the topology coordinator is not in candidate mode.
+ *
+ * Exactly one of either processWinElection or processLoseElection must be called if
+ * processHeartbeatResponse returns StartElection, to exit candidate mode.
+ */
+ void processWinElection(OID electionId, Timestamp electionOpTime);
+
+ /**
+ * Performs state updates associated with losing an election.
+ *
+ * It is an error to call this if the topology coordinator is not in candidate mode.
+ *
+ * Exactly one of either processWinElection or processLoseElection must be called if
+ * processHeartbeatResponse returns StartElection, to exit candidate mode.
+ */
+ void processLoseElection();
+
+ /**
+ * Readies the TopologyCoordinator for an attempt to stepdown that may fail. This is used
+ * when we receive a stepdown command (which can fail if not enough secondaries are caught up)
+ * to ensure that we never process more than one stepdown request at a time.
+ * Returns OK if it is safe to continue with the stepdown attempt, or returns
+ * ConflictingOperationInProgess if this node is already processing a stepdown request of any
+ * kind.
+ */
+ Status prepareForStepDownAttempt();
+
+ /**
+ * If this node is still attempting to process a stepdown attempt, aborts the attempt and
+ * returns this node to normal primary/master state. If this node has already completed
+ * stepping down or is now in the process of handling an unconditional stepdown, then this
+ * method does nothing.
+ */
+ void abortAttemptedStepDownIfNeeded();
+
+ /**
+ * Tries to transition the coordinator from the leader role to the follower role.
+ *
+ * A step down succeeds based on the following conditions:
+ *
+ * C1. 'force' is true and now > waitUntil
+ *
+ * C2. A majority set of nodes, M, in the replica set have optimes greater than or
+ * equal to the last applied optime of the primary.
+ *
+ * C3. There exists at least one electable secondary node in the majority set M.
+ *
+ *
+ * If C1 is true, or if both C2 and C3 are true, then the stepdown occurs and this method
+ * returns true. If the conditions for successful stepdown aren't met yet, but waiting for more
+ * time to pass could make it succeed, returns false. If the whole stepdown attempt should be
+ * abandoned (for example because the time limit expired or because we've already stepped down),
+ * throws an exception.
+ * TODO(spencer): Unify with the finishUnconditionalStepDown() method.
+ */
+ bool attemptStepDown(
+ long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force);
+
+ /**
+ * Returns whether it is safe for a stepdown attempt to complete, ignoring the 'force' argument.
+ * This is essentially checking conditions C2 and C3 as described in the comment to
+ * attemptStepDown().
+ */
+ bool isSafeToStepDown();
+
+ /**
+ * Readies the TopologyCoordinator for stepdown. Returns false if we're already in the process
+ * of an unconditional step down. If we are in the middle of a stepdown command attempt when
+ * this is called then this unconditional stepdown will supersede the stepdown attempt, which
+ * will cause the stepdown to fail. When this returns true it must be followed by a call to
+ * finishUnconditionalStepDown() that is called when holding the global X lock.
+ */
+ bool prepareForUnconditionalStepDown();
+
+ /**
+ * Sometimes a request to step down comes in (like via a heartbeat), but we don't have the
+ * global exclusive lock so we can't actually stepdown at that moment. When that happens
+ * we record that a stepdown request is pending (by calling prepareForUnconditionalStepDown())
+ * and schedule work to stepdown in the global X lock. This method is called after holding the
+ * global lock to perform the actual stepdown.
+ * TODO(spencer): Unify with the finishAttemptedStepDown() method.
+ */
+ void finishUnconditionalStepDown();
+
+ /**
+ * Considers whether or not this node should stand for election, and returns true
+ * if the node has transitioned to candidate role as a result of the call.
+ */
+ Status checkShouldStandForElection(Date_t now) const;
+
+ /**
+ * Set the outgoing heartbeat message from self
+ */
+ void setMyHeartbeatMessage(const Date_t now, const std::string& s);
+
+ /**
+ * Prepares a ReplSetMetadata object describing the current term, primary, and lastOp
+ * information.
+ */
+ rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const;
+
+ /**
+ * Prepares an OplogQueryMetadata object describing the current sync source, rbid, primary,
+ * lastOpApplied, and lastOpCommitted.
+ */
+ rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const;
+
+ /**
+ * Writes into 'output' all the information needed to generate a summary of the current
+ * replication state for use by the web interface.
+ */
+ void summarizeAsHtml(ReplSetHtmlSummary* output);
+
+ /**
+ * Prepares a ReplSetRequestVotesResponse.
+ */
+ void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
+ ReplSetRequestVotesResponse* response);
+
+ /**
+ * Loads an initial LastVote document, which was read from local storage.
+ *
+ * Called only during replication startup. All other updates are done internally.
+ */
+ void loadLastVote(const LastVote& lastVote);
+
+ /**
+ * Updates the current primary index.
+ */
+ void setPrimaryIndex(long long primaryIndex);
+
+ /**
+ * Returns the current primary index.
+ */
+ int getCurrentPrimaryIndex() const;
+
+ enum StartElectionReason {
+ kElectionTimeout,
+ kPriorityTakeover,
+ kStepUpRequest,
+ kCatchupTakeover
+ };
+
+ /**
+ * Transitions to the candidate role if the node is electable.
+ */
+ Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason);
+
+ /**
+ * Updates the storage engine read committed support in the TopologyCoordinator options after
+ * creation.
+ */
+ void setStorageEngineSupportsReadCommitted(bool supported);
+
+ /**
+ * Reset the booleans to record the last heartbeat restart.
+ */
+ void restartHeartbeats();
+
+ /**
+ * Scans through all members that are 'up' and return the latest known optime, if we have
+ * received (successful or failed) heartbeats from all nodes since heartbeat restart.
+ *
+ * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted
+ * heartbeats.
+ * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down.
+ */
+ boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const;
////////////////////////////////////////////////////////////
//
@@ -289,6 +783,9 @@ public:
OID getElectionId() const;
private:
+ typedef int UnelectableReasonMask;
+ class PingStats;
+
enum UnelectableReason {
None = 0,
CannotSeeMajority = 1 << 0,
@@ -304,8 +801,6 @@ private:
NotCloseEnoughToLatestForPriorityTakeover = 1 << 10,
NotFreshEnoughForCatchupTakeover = 1 << 11,
};
- typedef int UnelectableReasonMask;
-
// Set what type of PRIMARY this node currently is.
void _setLeaderMode(LeaderMode mode);
@@ -525,5 +1020,81 @@ private:
ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown};
};
+/**
+ * Represents a latency measurement for each replica set member based on heartbeat requests.
+ * The measurement is an average weighted 80% to the old value, and 20% to the new value.
+ *
+ * Also stores information about heartbeat progress and retries.
+ */
+class TopologyCoordinator::PingStats {
+public:
+ /**
+ * Records that a new heartbeat request started at "now".
+ *
+ * This resets the failure count used in determining whether the next request to a target
+ * should be a retry or a regularly scheduled heartbeat message.
+ */
+ void start(Date_t now);
+
+ /**
+ * Records that a heartbeat request completed successfully, and that "millis" milliseconds
+ * were spent for a single network roundtrip plus remote processing time.
+ */
+ void hit(Milliseconds millis);
+
+ /**
+ * Records that a heartbeat request failed.
+ */
+ void miss();
+
+ /**
+ * Gets the number of hit() calls.
+ */
+ unsigned int getCount() const {
+ return count;
+ }
+
+ /**
+ * Gets the weighted average round trip time for heartbeat messages to the target.
+ * Returns 0 if there have been no pings recorded yet.
+ */
+ Milliseconds getMillis() const {
+ return value == UninitializedPing ? Milliseconds(0) : value;
+ }
+
+ /**
+ * Gets the date at which start() was last called, which is used to determine if
+ * a heartbeat should be retried or if the time limit has expired.
+ */
+ Date_t getLastHeartbeatStartDate() const {
+ return _lastHeartbeatStartDate;
+ }
+
+ /**
+ * Gets the number of failures since start() was last called.
+ *
+ * This value is incremented by calls to miss(), cleared by calls to start() and
+ * set to the maximum possible value by calls to hit().
+ */
+ int getNumFailuresSinceLastStart() const {
+ return _numFailuresSinceLastStart;
+ }
+
+private:
+ static constexpr Milliseconds UninitializedPing{-1};
+
+ unsigned int count = 0;
+ Milliseconds value = UninitializedPing;
+ Date_t _lastHeartbeatStartDate;
+ int _numFailuresSinceLastStart = std::numeric_limits<int>::max();
+};
+
+//
+// Convenience method for unittest code. Please use accessors otherwise.
+//
+
+std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role);
+std::ostream& operator<<(std::ostream& os, TopologyCoordinator::PrepareFreezeResponseResult result);
+
} // namespace repl
} // namespace mongo