diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-11-27 10:32:07 -0500 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-11-28 15:59:13 -0500 |
commit | 26b42901e302db19cfbdbf3726b7be6fb624f4f5 (patch) | |
tree | 3e1a3e8f57c584ea3131b6fda11d84531b3077eb /src/mongo/db/repl/topology_coordinator.h | |
parent | e38cf83f058f2e29e41c4fe2f02b7fbc405ceb8c (diff) | |
download | mongo-26b42901e302db19cfbdbf3726b7be6fb624f4f5.tar.gz |
SERVER-30626 Remove TopologyCoordinator interface.
Diffstat (limited to 'src/mongo/db/repl/topology_coordinator.h')
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 931 |
1 files changed, 751 insertions, 180 deletions
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 046ff2217a6..eca071e590a 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -28,241 +28,735 @@ #pragma once +#include <iosfwd> #include <string> -#include <vector> -#include "mongo/bson/timestamp.h" +#include "mongo/base/disallow_copying.h" #include "mongo/db/repl/last_vote.h" -#include "mongo/db/repl/member_data.h" -#include "mongo/db/repl/member_state.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/repl/repl_set_config.h" +#include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/server_options.h" +#include "mongo/stdx/functional.h" +#include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" namespace mongo { - -class OperationContext; - -namespace rpc { -class ReplSetMetadata; -} // namespace rpc +class Timestamp; namespace repl { - -static const Milliseconds UninitializedPing{-1}; +class HeartbeatResponseAction; +class MemberData; +class OpTime; +class ReplSetHeartbeatArgs; +class ReplSetConfig; +class TagSubgroup; +struct MemberState; /** - * Represents a latency measurement for each replica set member based on heartbeat requests. - * The measurement is an average weighted 80% to the old value, and 20% to the new value. + * Replication Topology Coordinator * - * Also stores information about heartbeat progress and retries. + * This object is responsible for managing the topology of the cluster. + * Tasks include consensus and leader election, chaining, and configuration management. + * Methods of this class should be non-blocking. */ -class PingStats { +class TopologyCoordinator { + MONGO_DISALLOW_COPYING(TopologyCoordinator); + public: /** - * Records that a new heartbeat request started at "now". + * Type that denotes the role of a node in the replication protocol. * - * This resets the failure count used in determining whether the next request to a target - * should be a retry or a regularly scheduled heartbeat message. + * The role is distinct from MemberState, in that it only deals with the + * roles a node plays in the basic protocol -- leader, follower and candidate. + * The mapping between MemberState and Role is complex -- several MemberStates + * map to the follower role, and MemberState::RS_SECONDARY maps to either + * follower or candidate roles, e.g. */ - void start(Date_t now); + class Role { + public: + /** + * Constant indicating leader role. + */ + static const Role leader; + + /** + * Constant indicating follower role. + */ + static const Role follower; + + /** + * Constant indicating candidate role + */ + static const Role candidate; + + Role() {} + + bool operator==(Role other) const { + return _value == other._value; + } + bool operator!=(Role other) const { + return _value != other._value; + } + + std::string toString() const; + + private: + explicit Role(int value); + + int _value; + }; + + struct Options { + // A sync source is re-evaluated after it lags behind further than this amount. + Seconds maxSyncSourceLagSecs{0}; + + // Whether or not this node is running as a config server. + ClusterRole clusterRole{ClusterRole::None}; + }; /** - * Records that a heartbeat request completed successfully, and that "millis" milliseconds - * were spent for a single network roundtrip plus remote processing time. + * Constructs a Topology Coordinator object. + **/ + TopologyCoordinator(Options options); + + + ~TopologyCoordinator(); + + /** + * Different modes a node can be in while still reporting itself as in state PRIMARY. + * + * Valid transitions: + * + * kNotLeader <---------------------------------- + * | | + * | | + * | | + * v | + * kLeaderElect----- | + * | | | + * | | | + * v | | + * kMaster ------------------------- | + * | ^ | | | + * | | ------------------- | | + * | | | | | | + * v | v v v | + * kAttemptingStepDown----------->kSteppingDown | + * | | | + * | | | + * | | | + * --------------------------------------------- + * */ - void hit(Milliseconds millis); + enum class LeaderMode { + kNotLeader, // This node is not currently a leader. + kLeaderElect, // This node has been elected leader, but can't yet accept writes. + kMaster, // This node reports ismaster:true and can accept writes. + kSteppingDown, // This node is in the middle of a (hb) stepdown that must complete. + kAttemptingStepDown, // This node is in the middle of a stepdown (cmd) that might fail. + }; + + //////////////////////////////////////////////////////////// + // + // State inspection methods. + // + //////////////////////////////////////////////////////////// /** - * Records that a heartbeat request failed. + * Gets the role of this member in the replication protocol. */ - void miss(); + Role getRole() const; /** - * Gets the number of hit() calls. + * Gets the MemberState of this member in the replica set. */ - unsigned int getCount() const { - return count; - } + MemberState getMemberState() const; /** - * Gets the weighted average round trip time for heartbeat messages to the target. - * Returns 0 if there have been no pings recorded yet. + * Returns whether this node should be allowed to accept writes. */ - Milliseconds getMillis() const { - return value == UninitializedPing ? Milliseconds(0) : value; - } + bool canAcceptWrites() const; /** - * Gets the date at which start() was last called, which is used to determine if - * a heartbeat should be retried or if the time limit has expired. + * Returns true if this node is in the process of stepping down. Note that this can be + * due to an unconditional stepdown that must succeed (for instance from learning about a new + * term) or due to a stepdown attempt that could fail (for instance from a stepdown cmd that + * could fail if not enough nodes are caught up). */ - Date_t getLastHeartbeatStartDate() const { - return _lastHeartbeatStartDate; - } + bool isSteppingDown() const; /** - * Gets the number of failures since start() was last called. + * Returns the address of the current sync source, or an empty HostAndPort if there is no + * current sync source. + */ + HostAndPort getSyncSourceAddress() const; + + /** + * Retrieves a vector of HostAndPorts containing all nodes that are neither DOWN nor + * ourself. + */ + std::vector<HostAndPort> getMaybeUpHostAndPorts() const; + + /** + * Gets the earliest time the current node will stand for election. + */ + Date_t getStepDownTime() const; + + /** + * Gets the current value of the maintenance mode counter. + */ + int getMaintenanceCount() const; + + /** + * Gets the latest term this member is aware of. If this member is the primary, + * it's the current term of the replica set. + */ + long long getTerm() const; + + enum class UpdateTermResult { kAlreadyUpToDate, kTriggerStepDown, kUpdatedTerm }; + + //////////////////////////////////////////////////////////// + // + // Basic state manipulation methods. + // + //////////////////////////////////////////////////////////// + + /** + * Sets the latest term this member is aware of to the higher of its current value and + * the value passed in as "term". + * Returns the result of setting the term value, or if a stepdown should be triggered. + */ + UpdateTermResult updateTerm(long long term, Date_t now); + + /** + * Sets the index into the config used when we next choose a sync source + */ + void setForceSyncSourceIndex(int index); + + enum class ChainingPreference { kAllowChaining, kUseConfiguration }; + + /** + * Chooses and sets a new sync source, based on our current knowledge of the world. + */ + HostAndPort chooseNewSyncSource(Date_t now, + const OpTime& lastOpTimeFetched, + ChainingPreference chainingPreference); + + /** + * Suppresses selecting "host" as sync source until "until". + */ + void blacklistSyncSource(const HostAndPort& host, Date_t until); + + /** + * Removes a single entry "host" from the list of potential sync sources which we + * have blacklisted, if it is supposed to be unblacklisted by "now". + */ + void unblacklistSyncSource(const HostAndPort& host, Date_t now); + + /** + * Clears the list of potential sync sources we have blacklisted. + */ + void clearSyncSourceBlacklist(); + + /** + * Determines if a new sync source should be chosen, if a better candidate sync source is + * available. If the current sync source's last optime ("syncSourceLastOpTime" under + * protocolVersion 1, but pulled from the MemberData in protocolVersion 0) is more than + * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are + * running in ProtocolVersion 1, our current sync source is not primary, has no sync source + * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true. * - * This value is incremented by calls to miss(), cleared by calls to start() and - * set to the maximum possible value by calls to hit(). + * "now" is used to skip over currently blacklisted sync sources. + * + * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. */ - int getNumFailuresSinceLastStart() const { - return _numFailuresSinceLastStart; - } + bool shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata, + Date_t now) const; -private: - unsigned int count = 0; - Milliseconds value = UninitializedPing; - Date_t _lastHeartbeatStartDate; - int _numFailuresSinceLastStart = std::numeric_limits<int>::max(); -}; + /** + * Checks whether we are a single node set and we are not in a stepdown period. If so, + * puts us into candidate mode, otherwise does nothing. This is used to ensure that + * nodes in a single node replset become primary again when their stepdown period ends. + */ + bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); -class TopologyCoordinatorImpl : public TopologyCoordinator { -public: - struct Options { - // A sync source is re-evaluated after it lags behind further than this amount. - Seconds maxSyncSourceLagSecs{0}; + /** + * Sets the earliest time the current node will stand for election to "newTime". + * + * Until this time, while the node may report itself as electable, it will not stand + * for election. + */ + void setElectionSleepUntil(Date_t newTime); - // Whether or not this node is running as a config server. - ClusterRole clusterRole{ClusterRole::None}; - }; + /** + * Sets the reported mode of this node to one of RS_SECONDARY, RS_STARTUP2, RS_ROLLBACK or + * RS_RECOVERING, when getRole() == Role::follower. This is the interface by which the + * applier changes the reported member state of the current node, and enables or suppresses + * electability of the current node. All modes but RS_SECONDARY indicate an unelectable + * follower state (one that cannot transition to candidate). + */ + void setFollowerMode(MemberState::MS newMode); /** - * Constructs a Topology Coordinator object. - **/ - TopologyCoordinatorImpl(Options options); + * Scan the memberData and determine the highest last applied or last + * durable optime present on a majority of servers; set _lastCommittedOpTime to this + * new entry. + * Whether the last applied or last durable op time is used depends on whether + * the config getWriteConcernMajorityShouldJournal is set. + * Returns true if the _lastCommittedOpTime was changed. + */ + bool updateLastCommittedOpTime(); + + /** + * Updates _lastCommittedOpTime to be "committedOpTime" if it is more recent than the + * current last committed OpTime. Returns true if _lastCommittedOpTime is changed. + */ + bool advanceLastCommittedOpTime(const OpTime& committedOpTime); + + /** + * Returns the OpTime of the latest majority-committed op known to this server. + */ + OpTime getLastCommittedOpTime() const; + + /** + * Returns true if it's safe to transition to LeaderMode::kMaster. + */ + bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const; + + /** + * Called by the ReplicationCoordinator to signal that we have finished catchup and drain modes + * and are ready to fully become primary and start accepting writes. + * "firstOpTimeOfTerm" is a floor on the OpTimes this node will be allowed to consider committed + * for this tenure as primary. This prevents entries from before our election from counting as + * committed in our view, until our election (the "firstOpTimeOfTerm" op) has been committed. + * Returns PrimarySteppedDown if this node is no longer eligible to begin accepting writes. + */ + Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm); + + /** + * Adjusts the maintenance mode count by "inc". + * + * It is an error to call this method if getRole() does not return Role::follower. + * It is an error to allow the maintenance count to go negative. + */ + void adjustMaintenanceCountBy(int inc); //////////////////////////////////////////////////////////// // - // Implementation of TopologyCoordinator interface + // Methods that prepare responses to command requests. // //////////////////////////////////////////////////////////// - virtual Role getRole() const; - virtual MemberState getMemberState() const; - virtual bool canAcceptWrites() const override; - virtual bool isSteppingDown() const override; - virtual HostAndPort getSyncSourceAddress() const; - virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const; - virtual int getMaintenanceCount() const; - virtual long long getTerm(); - virtual UpdateTermResult updateTerm(long long term, Date_t now); - virtual void setForceSyncSourceIndex(int index); - virtual HostAndPort chooseNewSyncSource(Date_t now, - const OpTime& lastOpTimeFetched, - ChainingPreference chainingPreference) override; - virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); - virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now); - virtual void clearSyncSourceBlacklist(); - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata, - Date_t now) const; - virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); - virtual void setElectionSleepUntil(Date_t newTime); - virtual void setFollowerMode(MemberState::MS newMode); - virtual bool updateLastCommittedOpTime(); - virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime); - virtual OpTime getLastCommittedOpTime() const; - virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override; - virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) override; - virtual void adjustMaintenanceCountBy(int inc); - virtual void prepareSyncFromResponse(const HostAndPort& target, - BSONObjBuilder* response, - Status* result); - virtual void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args, - Date_t now, - BSONObjBuilder* response, - Status* result); - virtual void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args, - Date_t now, - BSONObjBuilder* response, - Status* result); - virtual Status prepareHeartbeatResponse(Date_t now, - const ReplSetHeartbeatArgs& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response); - virtual Status prepareHeartbeatResponseV1(Date_t now, - const ReplSetHeartbeatArgsV1& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response); - virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, - BSONObjBuilder* response, - Status* result); - virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand( + // produces a reply to a replSetSyncFrom command + void prepareSyncFromResponse(const HostAndPort& target, + BSONObjBuilder* response, + Status* result); + + // produce a reply to a replSetFresh command + void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args, + Date_t now, + BSONObjBuilder* response, + Status* result); + + // produce a reply to a received electCmd + void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args, + Date_t now, + BSONObjBuilder* response, + Status* result); + + // produce a reply to a heartbeat + Status prepareHeartbeatResponse(Date_t now, + const ReplSetHeartbeatArgs& args, + const std::string& ourSetName, + ReplSetHeartbeatResponse* response); + + // produce a reply to a V1 heartbeat + Status prepareHeartbeatResponseV1(Date_t now, + const ReplSetHeartbeatArgsV1& args, + const std::string& ourSetName, + ReplSetHeartbeatResponse* response); + + struct ReplSetStatusArgs { + Date_t now; + unsigned selfUptime; + const OpTime& readConcernMajorityOpTime; + const BSONObj& initialSyncStatus; + }; + + // produce a reply to a status request + void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, + BSONObjBuilder* response, + Status* result); + + // Produce a replSetUpdatePosition command to be sent to the node's sync source. + StatusWith<BSONObj> prepareReplSetUpdatePositionCommand( ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle, OpTime currentCommittedSnapshotOpTime) const; - virtual void fillIsMasterForReplSet(IsMasterResponse* response); - virtual void fillMemberData(BSONObjBuilder* result); - virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now, - int secs, - BSONObjBuilder* response); - virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now); - virtual std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest( + // produce a reply to an ismaster request. It is only valid to call this if we are a + // replset. + void fillIsMasterForReplSet(IsMasterResponse* response); + + // Produce member data for the serverStatus command and diagnostic logging. + void fillMemberData(BSONObjBuilder* result); + + enum class PrepareFreezeResponseResult { kNoAction, kElectSelf }; + + /** + * Produce a reply to a freeze request. Returns a PostMemberStateUpdateAction on success that + * may trigger state changes in the caller. + */ + StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now, + int secs, + BSONObjBuilder* response); + + //////////////////////////////////////////////////////////// + // + // Methods for sending and receiving heartbeats, + // reconfiguring and handling the results of standing for + // election. + // + //////////////////////////////////////////////////////////// + + /** + * Updates the topology coordinator's notion of the replica set configuration. + * + * "newConfig" is the new configuration, and "selfIndex" is the index of this + * node's configuration information in "newConfig", or "selfIndex" is -1 to + * indicate that this node is not a member of "newConfig". + * + * newConfig.isInitialized() should be true, though implementations may accept + * configurations where this is not true, for testing purposes. + */ + void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now); + + /** + * Prepares a heartbeat request appropriate for sending to "target", assuming the + * current time is "now". "ourSetName" is used as the name for our replica set if + * the topology coordinator does not have a valid configuration installed. + * + * The returned pair contains proper arguments for a replSetHeartbeat command, and + * an amount of time to wait for the response. + * + * This call should be paired (with intervening network communication) with a call to + * processHeartbeatResponse for the same "target". + */ + std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest( Date_t now, const std::string& ourSetName, const HostAndPort& target); - virtual std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1( + std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1( Date_t now, const std::string& ourSetName, const HostAndPort& target); - virtual HeartbeatResponseAction processHeartbeatResponse( + + /** + * Processes a heartbeat response from "target" that arrived around "now", having + * spent "networkRoundTripTime" millis on the network. + * + * Updates internal topology coordinator state, and returns instructions about what action + * to take next. + * + * If the next action indicates StartElection, the topology coordinator has transitioned to + * the "candidate" role, and will remain there until processWinElection or + * processLoseElection are called. + * + * If the next action indicates "StepDownSelf", the topology coordinator has transitioned + * to the "follower" role from "leader", and the caller should take any necessary actions + * to become a follower. + * + * If the next action indicates "StepDownRemotePrimary", the caller should take steps to + * cause the specified remote host to step down from primary to secondary. + * + * If the next action indicates "Reconfig", the caller should verify the configuration in + * hbResponse is acceptable, perform any other reconfiguration actions it must, and call + * updateConfig with the new configuration and the appropriate value for "selfIndex". It + * must also wrap up any outstanding elections (by calling processLoseElection or + * processWinElection) before calling updateConfig. + * + * This call should be paired (with intervening network communication) with a call to + * prepareHeartbeatRequest for the same "target". + */ + HeartbeatResponseAction processHeartbeatResponse( Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, const StatusWith<ReplSetHeartbeatResponse>& hbResponse); - virtual bool voteForMyself(Date_t now); - virtual void setElectionInfo(OID electionId, Timestamp electionOpTime); - virtual void processWinElection(OID electionId, Timestamp electionOpTime); - virtual void processLoseElection(); - virtual Status checkShouldStandForElection(Date_t now) const; - virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message); - virtual bool attemptStepDown(long long termAtStart, - Date_t now, - Date_t waitUntil, - Date_t stepDownUntil, - bool force) override; - virtual bool isSafeToStepDown() override; - virtual bool prepareForUnconditionalStepDown() override; - virtual Status prepareForStepDownAttempt() override; - virtual void abortAttemptedStepDownIfNeeded() override; - virtual void finishUnconditionalStepDown() override; - virtual Date_t getStepDownTime() const; - virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const; - virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const; - virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response); - virtual void summarizeAsHtml(ReplSetHtmlSummary* output); - virtual void loadLastVote(const LastVote& lastVote); - virtual void voteForMyselfV1(); - virtual void setPrimaryIndex(long long primaryIndex); - virtual int getCurrentPrimaryIndex() const; - virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten); - virtual bool haveTaggedNodesReachedOpTime(const OpTime& opTime, - const ReplSetTagPattern& tagPattern, - bool durablyWritten); - virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, - bool durablyWritten, - bool skipSelf); - virtual bool setMemberAsDown(Date_t now, const int memberIndex) override; - virtual std::pair<int, Date_t> getStalestLiveMember() const; - virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now); - virtual void resetAllMemberTimeouts(Date_t now); - virtual void resetMemberTimeouts(Date_t now, - const stdx::unordered_set<HostAndPort>& member_set); - virtual OpTime getMyLastAppliedOpTime() const; - virtual OpTime getMyLastDurableOpTime() const; - virtual MemberData* getMyMemberData(); - virtual MemberData* findMemberDataByMemberId(const int memberId); - virtual MemberData* findMemberDataByRid(const OID rid); - virtual MemberData* addSlaveMemberData(const OID rid); - virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason); - virtual void setStorageEngineSupportsReadCommitted(bool supported); - - virtual void restartHeartbeats(); - - virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const; + + /** + * Returns whether or not at least 'numNodes' have reached the given opTime. + * "durablyWritten" indicates whether the operation has to be durably applied. + */ + bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten); + + /** + * Returns whether or not at least one node matching the tagPattern has reached + * the given opTime. + * "durablyWritten" indicates whether the operation has to be durably applied. + */ + bool haveTaggedNodesReachedOpTime(const OpTime& opTime, + const ReplSetTagPattern& tagPattern, + bool durablyWritten); + + /** + * Returns a vector of members that have applied the operation with OpTime 'op'. + * "durablyWritten" indicates whether the operation has to be durably applied. + * "skipSelf" means to exclude this node whether or not the op has been applied. + */ + std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, + bool durablyWritten, + bool skipSelf); + + /** + * Marks a member as down from our perspective and returns a bool which indicates if we can no + * longer see a majority of the nodes and thus should step down. + */ + bool setMemberAsDown(Date_t now, const int memberIndex); + + /** + * Goes through the memberData and determines which member that is currently live + * has the stalest (earliest) last update time. Returns (-1, Date_t::max()) if there are + * no other members. + */ + std::pair<int, Date_t> getStalestLiveMember() const; + + /** + * Go through the memberData, and mark nodes which haven't been updated + * recently (within an election timeout) as "down". Returns a HeartbeatResponseAction, which + * will be StepDownSelf if we can no longer see a majority of the nodes, otherwise NoAction. + */ + HeartbeatResponseAction checkMemberTimeouts(Date_t now); + + /** + * Set all nodes in memberData to not stale with a lastUpdate of "now". + */ + void resetAllMemberTimeouts(Date_t now); + + /** + * Set all nodes in memberData that are present in member_set + * to not stale with a lastUpdate of "now". + */ + void resetMemberTimeouts(Date_t now, const stdx::unordered_set<HostAndPort>& member_set); + + /* + * Returns the last optime that this node has applied, whether or not it has been journaled. + */ + OpTime getMyLastAppliedOpTime() const; + + /* + * Returns the last optime that this node has applied and journaled. + */ + OpTime getMyLastDurableOpTime() const; + + /* + * Returns information we have on the state of this node. + */ + MemberData* getMyMemberData(); + + /* + * Returns information we have on the state of the node identified by memberId. Returns + * nullptr if memberId is not found in the configuration. + */ + MemberData* findMemberDataByMemberId(const int memberId); + + /* + * Returns information we have on the state of the node identified by rid. Returns + * nullptr if rid is not found in the heartbeat data. This method is used only for + * master/slave replication. + */ + MemberData* findMemberDataByRid(const OID rid); + + /* + * Adds and returns a memberData entry for the given RID. + * Used only in master/slave mode. + */ + MemberData* addSlaveMemberData(const OID rid); + + /** + * If getRole() == Role::candidate and this node has not voted too recently, updates the + * lastVote tracker and returns true. Otherwise, returns false. + */ + bool voteForMyself(Date_t now); + + /** + * Sets lastVote to be for ourself in this term. + */ + void voteForMyselfV1(); + + /** + * Sets election id and election optime. + */ + void setElectionInfo(OID electionId, Timestamp electionOpTime); + + /** + * Performs state updates associated with winning an election. + * + * It is an error to call this if the topology coordinator is not in candidate mode. + * + * Exactly one of either processWinElection or processLoseElection must be called if + * processHeartbeatResponse returns StartElection, to exit candidate mode. + */ + void processWinElection(OID electionId, Timestamp electionOpTime); + + /** + * Performs state updates associated with losing an election. + * + * It is an error to call this if the topology coordinator is not in candidate mode. + * + * Exactly one of either processWinElection or processLoseElection must be called if + * processHeartbeatResponse returns StartElection, to exit candidate mode. + */ + void processLoseElection(); + + /** + * Readies the TopologyCoordinator for an attempt to stepdown that may fail. This is used + * when we receive a stepdown command (which can fail if not enough secondaries are caught up) + * to ensure that we never process more than one stepdown request at a time. + * Returns OK if it is safe to continue with the stepdown attempt, or returns + * ConflictingOperationInProgess if this node is already processing a stepdown request of any + * kind. + */ + Status prepareForStepDownAttempt(); + + /** + * If this node is still attempting to process a stepdown attempt, aborts the attempt and + * returns this node to normal primary/master state. If this node has already completed + * stepping down or is now in the process of handling an unconditional stepdown, then this + * method does nothing. + */ + void abortAttemptedStepDownIfNeeded(); + + /** + * Tries to transition the coordinator from the leader role to the follower role. + * + * A step down succeeds based on the following conditions: + * + * C1. 'force' is true and now > waitUntil + * + * C2. A majority set of nodes, M, in the replica set have optimes greater than or + * equal to the last applied optime of the primary. + * + * C3. There exists at least one electable secondary node in the majority set M. + * + * + * If C1 is true, or if both C2 and C3 are true, then the stepdown occurs and this method + * returns true. If the conditions for successful stepdown aren't met yet, but waiting for more + * time to pass could make it succeed, returns false. If the whole stepdown attempt should be + * abandoned (for example because the time limit expired or because we've already stepped down), + * throws an exception. + * TODO(spencer): Unify with the finishUnconditionalStepDown() method. + */ + bool attemptStepDown( + long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force); + + /** + * Returns whether it is safe for a stepdown attempt to complete, ignoring the 'force' argument. + * This is essentially checking conditions C2 and C3 as described in the comment to + * attemptStepDown(). + */ + bool isSafeToStepDown(); + + /** + * Readies the TopologyCoordinator for stepdown. Returns false if we're already in the process + * of an unconditional step down. If we are in the middle of a stepdown command attempt when + * this is called then this unconditional stepdown will supersede the stepdown attempt, which + * will cause the stepdown to fail. When this returns true it must be followed by a call to + * finishUnconditionalStepDown() that is called when holding the global X lock. + */ + bool prepareForUnconditionalStepDown(); + + /** + * Sometimes a request to step down comes in (like via a heartbeat), but we don't have the + * global exclusive lock so we can't actually stepdown at that moment. When that happens + * we record that a stepdown request is pending (by calling prepareForUnconditionalStepDown()) + * and schedule work to stepdown in the global X lock. This method is called after holding the + * global lock to perform the actual stepdown. + * TODO(spencer): Unify with the finishAttemptedStepDown() method. + */ + void finishUnconditionalStepDown(); + + /** + * Considers whether or not this node should stand for election, and returns true + * if the node has transitioned to candidate role as a result of the call. + */ + Status checkShouldStandForElection(Date_t now) const; + + /** + * Set the outgoing heartbeat message from self + */ + void setMyHeartbeatMessage(const Date_t now, const std::string& s); + + /** + * Prepares a ReplSetMetadata object describing the current term, primary, and lastOp + * information. + */ + rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const; + + /** + * Prepares an OplogQueryMetadata object describing the current sync source, rbid, primary, + * lastOpApplied, and lastOpCommitted. + */ + rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const; + + /** + * Writes into 'output' all the information needed to generate a summary of the current + * replication state for use by the web interface. + */ + void summarizeAsHtml(ReplSetHtmlSummary* output); + + /** + * Prepares a ReplSetRequestVotesResponse. + */ + void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response); + + /** + * Loads an initial LastVote document, which was read from local storage. + * + * Called only during replication startup. All other updates are done internally. + */ + void loadLastVote(const LastVote& lastVote); + + /** + * Updates the current primary index. + */ + void setPrimaryIndex(long long primaryIndex); + + /** + * Returns the current primary index. + */ + int getCurrentPrimaryIndex() const; + + enum StartElectionReason { + kElectionTimeout, + kPriorityTakeover, + kStepUpRequest, + kCatchupTakeover + }; + + /** + * Transitions to the candidate role if the node is electable. + */ + Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason); + + /** + * Updates the storage engine read committed support in the TopologyCoordinator options after + * creation. + */ + void setStorageEngineSupportsReadCommitted(bool supported); + + /** + * Reset the booleans to record the last heartbeat restart. + */ + void restartHeartbeats(); + + /** + * Scans through all members that are 'up' and return the latest known optime, if we have + * received (successful or failed) heartbeats from all nodes since heartbeat restart. + * + * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted + * heartbeats. + * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down. + */ + boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const; //////////////////////////////////////////////////////////// // @@ -289,6 +783,9 @@ public: OID getElectionId() const; private: + typedef int UnelectableReasonMask; + class PingStats; + enum UnelectableReason { None = 0, CannotSeeMajority = 1 << 0, @@ -304,8 +801,6 @@ private: NotCloseEnoughToLatestForPriorityTakeover = 1 << 10, NotFreshEnoughForCatchupTakeover = 1 << 11, }; - typedef int UnelectableReasonMask; - // Set what type of PRIMARY this node currently is. void _setLeaderMode(LeaderMode mode); @@ -525,5 +1020,81 @@ private: ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown}; }; +/** + * Represents a latency measurement for each replica set member based on heartbeat requests. + * The measurement is an average weighted 80% to the old value, and 20% to the new value. + * + * Also stores information about heartbeat progress and retries. + */ +class TopologyCoordinator::PingStats { +public: + /** + * Records that a new heartbeat request started at "now". + * + * This resets the failure count used in determining whether the next request to a target + * should be a retry or a regularly scheduled heartbeat message. + */ + void start(Date_t now); + + /** + * Records that a heartbeat request completed successfully, and that "millis" milliseconds + * were spent for a single network roundtrip plus remote processing time. + */ + void hit(Milliseconds millis); + + /** + * Records that a heartbeat request failed. + */ + void miss(); + + /** + * Gets the number of hit() calls. + */ + unsigned int getCount() const { + return count; + } + + /** + * Gets the weighted average round trip time for heartbeat messages to the target. + * Returns 0 if there have been no pings recorded yet. + */ + Milliseconds getMillis() const { + return value == UninitializedPing ? Milliseconds(0) : value; + } + + /** + * Gets the date at which start() was last called, which is used to determine if + * a heartbeat should be retried or if the time limit has expired. + */ + Date_t getLastHeartbeatStartDate() const { + return _lastHeartbeatStartDate; + } + + /** + * Gets the number of failures since start() was last called. + * + * This value is incremented by calls to miss(), cleared by calls to start() and + * set to the maximum possible value by calls to hit(). + */ + int getNumFailuresSinceLastStart() const { + return _numFailuresSinceLastStart; + } + +private: + static constexpr Milliseconds UninitializedPing{-1}; + + unsigned int count = 0; + Milliseconds value = UninitializedPing; + Date_t _lastHeartbeatStartDate; + int _numFailuresSinceLastStart = std::numeric_limits<int>::max(); +}; + +// +// Convenience method for unittest code. Please use accessors otherwise. +// + +std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role); +std::ostream& operator<<(std::ostream& os, TopologyCoordinator::PrepareFreezeResponseResult result); + } // namespace repl } // namespace mongo |