/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include "mongo/bson/timestamp.h" #include "mongo/db/repl/last_vote.h" #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/server_options.h" #include "mongo/util/time_support.h" namespace mongo { class OperationContext; namespace rpc { class ReplSetMetadata; } // namespace rpc namespace repl { static const Milliseconds UninitializedPing{-1}; /** * Represents a latency measurement for each replica set member based on heartbeat requests. * The measurement is an average weighted 80% to the old value, and 20% to the new value. * * Also stores information about heartbeat progress and retries. */ class PingStats { public: /** * Records that a new heartbeat request started at "now". * * This resets the failure count used in determining whether the next request to a target * should be a retry or a regularly scheduled heartbeat message. */ void start(Date_t now); /** * Records that a heartbeat request completed successfully, and that "millis" milliseconds * were spent for a single network roundtrip plus remote processing time. */ void hit(Milliseconds millis); /** * Records that a heartbeat request failed. */ void miss(); /** * Gets the number of hit() calls. */ unsigned int getCount() const { return count; } /** * Gets the weighted average round trip time for heartbeat messages to the target. * Returns 0 if there have been no pings recorded yet. */ Milliseconds getMillis() const { return value == UninitializedPing ? Milliseconds(0) : value; } /** * Gets the date at which start() was last called, which is used to determine if * a heartbeat should be retried or if the time limit has expired. */ Date_t getLastHeartbeatStartDate() const { return _lastHeartbeatStartDate; } /** * Gets the number of failures since start() was last called. * * This value is incremented by calls to miss(), cleared by calls to start() and * set to the maximum possible value by calls to hit(). */ int getNumFailuresSinceLastStart() const { return _numFailuresSinceLastStart; } private: unsigned int count = 0; Milliseconds value = UninitializedPing; Date_t _lastHeartbeatStartDate; int _numFailuresSinceLastStart = std::numeric_limits::max(); }; class TopologyCoordinatorImpl : public TopologyCoordinator { public: struct Options { // A sync source is re-evaluated after it lags behind further than this amount. Seconds maxSyncSourceLagSecs{0}; // Whether or not this node is running as a config server. ClusterRole clusterRole{ClusterRole::None}; }; /** * Constructs a Topology Coordinator object. **/ TopologyCoordinatorImpl(Options options); //////////////////////////////////////////////////////////// // // Implementation of TopologyCoordinator interface // //////////////////////////////////////////////////////////// virtual Role getRole() const; virtual MemberState getMemberState() const; virtual HostAndPort getSyncSourceAddress() const; virtual std::vector getMaybeUpHostAndPorts() const; virtual int getMaintenanceCount() const; virtual long long getTerm(); virtual UpdateTermResult updateTerm(long long term, Date_t now); virtual void setForceSyncSourceIndex(int index); virtual HostAndPort chooseNewSyncSource(Date_t now, const Timestamp& lastTimestampFetched, ChainingPreference chainingPreference); virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now); virtual void clearSyncSourceBlacklist(); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, const rpc::ReplSetMetadata& metadata, Date_t now) const; virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); virtual void setElectionSleepUntil(Date_t newTime); virtual void setFollowerMode(MemberState::MS newMode); virtual void adjustMaintenanceCountBy(int inc); virtual void prepareSyncFromResponse(const HostAndPort& target, const OpTime& lastOpApplied, BSONObjBuilder* response, Status* result); virtual void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args, Date_t now, const OpTime& lastOpApplied, BSONObjBuilder* response, Status* result); virtual void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args, Date_t now, const OpTime& lastOpApplied, BSONObjBuilder* response, Status* result); virtual Status prepareHeartbeatResponse(Date_t now, const ReplSetHeartbeatArgs& args, const std::string& ourSetName, const OpTime& lastOpApplied, const OpTime& lastOpDurable, ReplSetHeartbeatResponse* response); virtual Status prepareHeartbeatResponseV1(Date_t now, const ReplSetHeartbeatArgsV1& args, const std::string& ourSetName, const OpTime& lastOpApplied, const OpTime& lastOpDurable, ReplSetHeartbeatResponse* response); virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, BSONObjBuilder* response, Status* result); virtual void fillIsMasterForReplSet(IsMasterResponse* response); virtual StatusWith prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response); virtual void updateConfig(const ReplicaSetConfig& newConfig, int selfIndex, Date_t now, const OpTime& lastOpApplied); virtual std::pair prepareHeartbeatRequest( Date_t now, const std::string& ourSetName, const HostAndPort& target); virtual std::pair prepareHeartbeatRequestV1( Date_t now, const std::string& ourSetName, const HostAndPort& target); virtual HeartbeatResponseAction processHeartbeatResponse( Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, const StatusWith& hbResponse, const OpTime& myLastOpApplied); virtual bool voteForMyself(Date_t now); virtual void setElectionInfo(OID electionId, Timestamp electionOpTime); virtual void processWinElection(OID electionId, Timestamp electionOpTime); virtual void processLoseElection(); virtual Status checkShouldStandForElection(Date_t now, const OpTime& lastOpApplied) const; virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message); virtual bool stepDown(Date_t until, bool force, const OpTime& lastOpApplied); virtual bool stepDownIfPending(); virtual Date_t getStepDownTime() const; virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata, const OpTime& lastVisibleOpTime, const OpTime& lastCommitttedOpTime) const; virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response, const OpTime& lastAppliedOpTime); virtual void summarizeAsHtml(ReplSetHtmlSummary* output); virtual void loadLastVote(const LastVote& lastVote); virtual void voteForMyselfV1(); virtual void prepareForStepDown(); virtual void setPrimaryIndex(long long primaryIndex); virtual HeartbeatResponseAction setMemberAsDown(Date_t now, const int memberIndex, const OpTime& myLastOpApplied); virtual Status becomeCandidateIfElectable(const Date_t now, const OpTime& lastOpApplied); virtual void setStorageEngineSupportsReadCommitted(bool supported); //////////////////////////////////////////////////////////// // // Test support methods // //////////////////////////////////////////////////////////// // Changes _memberState to newMemberState. Only for testing. void changeMemberState_forTest(const MemberState& newMemberState, const Timestamp& electionTime = Timestamp(0, 0)); // Sets "_electionTime" to "newElectionTime". Only for testing. void _setElectionTime(const Timestamp& newElectionTime); // Sets _currentPrimaryIndex to the given index. Should only be used in unit tests! // TODO(spencer): Remove this once we can easily call for an election in unit tests to // set the current primary. void _setCurrentPrimaryForTest(int primaryIndex); // Returns _electionTime. Only used in unittests. Timestamp getElectionTime() const; // Returns _electionId. Only used in unittests. OID getElectionId() const; // Returns _currentPrimaryIndex. Only used in unittests. int getCurrentPrimaryIndex() const; private: enum UnelectableReason { None = 0, CannotSeeMajority = 1 << 0, NotCloseEnoughToLatestOptime = 1 << 1, ArbiterIAm = 1 << 2, NotSecondary = 1 << 3, NoPriority = 1 << 4, StepDownPeriodActive = 1 << 5, NoData = 1 << 6, NotInitialized = 1 << 7, VotedTooRecently = 1 << 8, RefusesToStand = 1 << 9, NotCloseEnoughToLatestForPriorityTakeover = 1 << 10, }; typedef int UnelectableReasonMask; // Returns the number of heartbeat pings which have occurred. int _getTotalPings(); // Returns the current "ping" value for the given member by their address Milliseconds _getPing(const HostAndPort& host); // Determines if we will veto the member specified by "args.id", given that the last op // we have applied locally is "lastOpApplied". // If we veto, the errmsg will be filled in with a reason bool _shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args, const Date_t& now, const OpTime& lastOpApplied, std::string* errmsg) const; // Returns the index of the member with the matching id, or -1 if none match. int _getMemberIndex(int id) const; // Sees if a majority number of votes are held by members who are currently "up" bool _aMajoritySeemsToBeUp() const; // Is otherOpTime close enough (within 10 seconds) to the latest known optime to qualify // for an election bool _isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime, const OpTime& ourLastOpApplied) const; // Is our optime close enough to the latest known optime to call for a priority takeover. bool _amIFreshEnoughForPriorityTakeover(const OpTime& ourLastOpApplied) const; // Returns reason why "self" member is unelectable UnelectableReasonMask _getMyUnelectableReason(const Date_t now, const OpTime& lastOpApplied) const; // Returns reason why memberIndex is unelectable UnelectableReasonMask _getUnelectableReason(int memberIndex, const OpTime& lastOpApplied) const; // Returns the nice text of why the node is unelectable std::string _getUnelectableReasonString(UnelectableReasonMask ur) const; // Return true if we are currently primary bool _iAmPrimary() const; // Scans through all members that are 'up' and return the latest known optime. OpTime _latestKnownOpTime(const OpTime& ourLastOpApplied) const; // Scans the electable set and returns the highest priority member index int _getHighestPriorityElectableIndex(Date_t now, const OpTime& lastOpApplied) const; // Returns true if "one" member is higher priority than "two" member bool _isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const; // Helper shortcut to self config const MemberConfig& _selfConfig() const; // Returns NULL if there is no primary, or the MemberConfig* for the current primary const MemberConfig* _currentPrimaryMember() const; /** * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an * election or stepdown should commence. * _updatePrimaryFromHBDataV1() is a simplified version of _updatePrimaryFromHBData() to be used * when in ProtocolVersion1. */ HeartbeatResponseAction _updatePrimaryFromHBData(int updatedConfigIndex, const MemberState& originalState, Date_t now, const OpTime& lastOpApplied); HeartbeatResponseAction _updatePrimaryFromHBDataV1(int updatedConfigIndex, const MemberState& originalState, Date_t now, const OpTime& lastOpApplied); /** * Updates _hbdata based on the newConfig, ensuring that every member in the newConfig * has an entry in _hbdata. If any nodes in the newConfig are also present in * _currentConfig, copies their heartbeat info into the corresponding entry in the updated * _hbdata vector. */ void _updateHeartbeatDataForReconfig(const ReplicaSetConfig& newConfig, int selfIndex, Date_t now); void _stepDownSelfAndReplaceWith(int newPrimary); /** * Looks up the provided member in the blacklist and returns true if the member's blacklist * expire time is after 'now'. If the member is found but the expire time is before 'now', * the function returns false. If the member is not found in the blacklist, the function * returns false. **/ bool _memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const; // This node's role in the replication protocol. Role _role; // This is a unique id that is generated and set each time we transition to PRIMARY, as the // result of an election. OID _electionId; // The time at which the current PRIMARY was elected. Timestamp _electionTime; // This node's election term. The term is used as part of the consensus algorithm to elect // and maintain one primary (leader) node in the cluster. long long _term; // the index of the member we currently believe is primary, if one exists, otherwise -1 int _currentPrimaryIndex; // the hostandport we are currently syncing from // empty if no sync source (we are primary, or we cannot connect to anyone yet) HostAndPort _syncSource; // These members are not chosen as sync sources for a period of time, due to connection // issues with them std::map _syncSourceBlacklist; // The next sync source to be chosen, requested via a replSetSyncFrom command int _forceSyncSourceIndex; // Options for this TopologyCoordinator Options _options; // "heartbeat message" // sent in requestHeartbeat respond in field "hbm" std::string _hbmsg; Date_t _hbmsgTime; // when it was logged // heartbeat msg to send to others; descriptive diagnostic info std::string _getHbmsg(Date_t now) const; int _selfIndex; // this node's index in _members and _currentConfig ReplicaSetConfig _rsConfig; // The current config, including a vector of MemberConfigs // heartbeat data for each member. It is guaranteed that this vector will be maintained // in the same order as the MemberConfigs in _currentConfig, therefore the member config // index can be used to index into this vector as well. std::vector _hbdata; // Indicates that we've received a request to stepdown from PRIMARY (likely via a heartbeat) bool _stepDownPending; // Time when stepDown command expires Date_t _stepDownUntil; // A time before which this node will not stand for election. // In protocol version 1, this is used to prevent running for election after seeing // a new term. Date_t _electionSleepUntil; // The number of calls we have had to enter maintenance mode int _maintenanceModeCalls; // The sub-mode of follower that we are in. Legal values are RS_SECONDARY, RS_RECOVERING, // RS_STARTUP2 (initial sync) and RS_ROLLBACK. Only meaningful if _role == Role::follower. // Configured via setFollowerMode(). If the sub-mode is RS_SECONDARY, then the effective // sub-mode is either RS_SECONDARY or RS_RECOVERING, depending on _maintenanceModeCalls. // Rather than accesing this variable direclty, one should use the getMemberState() method, // which computes the replica set node state on the fly. MemberState::MS _followerMode; typedef std::map PingMap; // Ping stats for each member by HostAndPort; PingMap _pings; // Last vote info from the election struct VoteLease { static const Seconds leaseTime; Date_t when; int whoId = -1; HostAndPort whoHostAndPort; } _voteLease; // V1 last vote info for elections LastVote _lastVote; enum class ReadCommittedSupport { kUnknown, kNo, kYes, }; // Whether or not the storage engine supports read committed. ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown}; }; } // namespace repl } // namespace mongo