diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.h')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 1772 |
1 files changed, 877 insertions, 895 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 8fcd9671dae..7183145abcd 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -53,991 +53,973 @@ namespace mongo { - class Timer; - template <typename T> class StatusWith; +class Timer; +template <typename T> +class StatusWith; namespace repl { - class ElectCmdRunner; - class ElectionWinnerDeclarer; - class FreshnessChecker; - class HandshakeArgs; - class HeartbeatResponseAction; - class LastVote; - class OplogReader; - class ReplSetDeclareElectionWinnerArgs; - class ReplSetRequestVotesArgs; - class ReplicaSetConfig; - class SyncSourceFeedback; - class TopologyCoordinator; - class VoteRequester; +class ElectCmdRunner; +class ElectionWinnerDeclarer; +class FreshnessChecker; +class HandshakeArgs; +class HeartbeatResponseAction; +class LastVote; +class OplogReader; +class ReplSetDeclareElectionWinnerArgs; +class ReplSetRequestVotesArgs; +class ReplicaSetConfig; +class SyncSourceFeedback; +class TopologyCoordinator; +class VoteRequester; - class ReplicationCoordinatorImpl : public ReplicationCoordinator, - public KillOpListenerInterface { - MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl); +class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpListenerInterface { + MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl); - public: +public: + // Takes ownership of the "externalState", "topCoord" and "network" objects. + ReplicationCoordinatorImpl(const ReplSettings& settings, + ReplicationCoordinatorExternalState* externalState, + executor::NetworkInterface* network, + StorageInterface* storage, + TopologyCoordinator* topoCoord, + int64_t prngSeed); + // Takes ownership of the "externalState" and "topCoord" objects. + ReplicationCoordinatorImpl(const ReplSettings& settings, + ReplicationCoordinatorExternalState* externalState, + TopologyCoordinator* topoCoord, + ReplicationExecutor* replExec, + int64_t prngSeed); + virtual ~ReplicationCoordinatorImpl(); - // Takes ownership of the "externalState", "topCoord" and "network" objects. - ReplicationCoordinatorImpl(const ReplSettings& settings, - ReplicationCoordinatorExternalState* externalState, - executor::NetworkInterface* network, - StorageInterface* storage, - TopologyCoordinator* topoCoord, - int64_t prngSeed); - // Takes ownership of the "externalState" and "topCoord" objects. - ReplicationCoordinatorImpl(const ReplSettings& settings, - ReplicationCoordinatorExternalState* externalState, - TopologyCoordinator* topoCoord, - ReplicationExecutor* replExec, - int64_t prngSeed); - virtual ~ReplicationCoordinatorImpl(); + // ================== Members of public ReplicationCoordinator API =================== - // ================== Members of public ReplicationCoordinator API =================== + virtual void startReplication(OperationContext* txn) override; - virtual void startReplication(OperationContext* txn) override; + virtual void shutdown() override; - virtual void shutdown() override; + virtual const ReplSettings& getSettings() const override; - virtual const ReplSettings& getSettings() const override; + virtual Mode getReplicationMode() const override; - virtual Mode getReplicationMode() const override; + virtual MemberState getMemberState() const override; - virtual MemberState getMemberState() const override; + virtual bool isInPrimaryOrSecondaryState() const override; - virtual bool isInPrimaryOrSecondaryState() const override; + virtual Seconds getSlaveDelaySecs() const override; - virtual Seconds getSlaveDelaySecs() const override; + virtual void clearSyncSourceBlacklist() override; - virtual void clearSyncSourceBlacklist() override; + /* + * Implementation of the KillOpListenerInterface interrupt method so that we can wake up + * threads blocked in awaitReplication() when a killOp command comes in. + */ + virtual void interrupt(unsigned opId); - /* - * Implementation of the KillOpListenerInterface interrupt method so that we can wake up - * threads blocked in awaitReplication() when a killOp command comes in. - */ - virtual void interrupt(unsigned opId); + /* + * Implementation of the KillOpListenerInterface interruptAll method so that we can wake up + * threads blocked in awaitReplication() when we kill all operations. + */ + virtual void interruptAll(); - /* - * Implementation of the KillOpListenerInterface interruptAll method so that we can wake up - * threads blocked in awaitReplication() when we kill all operations. - */ - virtual void interruptAll(); + virtual ReplicationCoordinator::StatusAndDuration awaitReplication( + OperationContext* txn, const OpTime& opTime, const WriteConcernOptions& writeConcern); - virtual ReplicationCoordinator::StatusAndDuration awaitReplication( - OperationContext* txn, - const OpTime& opTime, - const WriteConcernOptions& writeConcern); + virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpForClient( + OperationContext* txn, const WriteConcernOptions& writeConcern); - virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpForClient( - OperationContext* txn, - const WriteConcernOptions& writeConcern); + virtual Status stepDown(OperationContext* txn, + bool force, + const Milliseconds& waitTime, + const Milliseconds& stepdownTime); - virtual Status stepDown(OperationContext* txn, - bool force, - const Milliseconds& waitTime, - const Milliseconds& stepdownTime); + virtual bool isMasterForReportingPurposes(); - virtual bool isMasterForReportingPurposes(); + virtual bool canAcceptWritesForDatabase(StringData dbName); - virtual bool canAcceptWritesForDatabase(StringData dbName); + bool canAcceptWritesFor(const NamespaceString& ns) override; - bool canAcceptWritesFor(const NamespaceString& ns) override; + virtual Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions& writeConcern) const; - virtual Status checkIfWriteConcernCanBeSatisfied( - const WriteConcernOptions& writeConcern) const; + virtual Status checkCanServeReadsFor(OperationContext* txn, + const NamespaceString& ns, + bool slaveOk); - virtual Status checkCanServeReadsFor(OperationContext* txn, - const NamespaceString& ns, - bool slaveOk); + virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx); - virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx); + virtual Status setLastOptimeForSlave(const OID& rid, const Timestamp& ts); - virtual Status setLastOptimeForSlave(const OID& rid, const Timestamp& ts); + virtual void setMyLastOptime(const OpTime& opTime); - virtual void setMyLastOptime(const OpTime& opTime); + virtual void resetMyLastOptime(); - virtual void resetMyLastOptime(); + virtual void setMyHeartbeatMessage(const std::string& msg); - virtual void setMyHeartbeatMessage(const std::string& msg); + virtual OpTime getMyLastOptime() const override; - virtual OpTime getMyLastOptime() const override; + virtual ReadAfterOpTimeResponse waitUntilOpTime(OperationContext* txn, + const ReadAfterOpTimeArgs& settings) override; - virtual ReadAfterOpTimeResponse waitUntilOpTime( - OperationContext* txn, - const ReadAfterOpTimeArgs& settings) override; + virtual OID getElectionId() override; - virtual OID getElectionId() override; + virtual OID getMyRID() const override; - virtual OID getMyRID() const override; + virtual int getMyId() const override; - virtual int getMyId() const override; + virtual bool setFollowerMode(const MemberState& newState) override; - virtual bool setFollowerMode(const MemberState& newState) override; + virtual bool isWaitingForApplierToDrain() override; - virtual bool isWaitingForApplierToDrain() override; + virtual void signalDrainComplete(OperationContext* txn) override; - virtual void signalDrainComplete(OperationContext* txn) override; + virtual void signalUpstreamUpdater() override; - virtual void signalUpstreamUpdater() override; + virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override; - virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override; + virtual Status processReplSetGetStatus(BSONObjBuilder* result) override; - virtual Status processReplSetGetStatus(BSONObjBuilder* result) override; + virtual void fillIsMasterForReplSet(IsMasterResponse* result) override; - virtual void fillIsMasterForReplSet(IsMasterResponse* result) override; + virtual void appendSlaveInfoData(BSONObjBuilder* result) override; - virtual void appendSlaveInfoData(BSONObjBuilder* result) override; + virtual ReplicaSetConfig getConfig() const override; - virtual ReplicaSetConfig getConfig() const override; + virtual void processReplSetGetConfig(BSONObjBuilder* result) override; - virtual void processReplSetGetConfig(BSONObjBuilder* result) override; + virtual Status setMaintenanceMode(bool activate) override; - virtual Status setMaintenanceMode(bool activate) override; + virtual bool getMaintenanceMode() override; - virtual bool getMaintenanceMode() override; + virtual Status processReplSetSyncFrom(const HostAndPort& target, + BSONObjBuilder* resultObj) override; - virtual Status processReplSetSyncFrom(const HostAndPort& target, - BSONObjBuilder* resultObj) override; + virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override; - virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override; + virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args, + ReplSetHeartbeatResponse* response) override; - virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response) override; + virtual Status processReplSetReconfig(OperationContext* txn, + const ReplSetReconfigArgs& args, + BSONObjBuilder* resultObj) override; - virtual Status processReplSetReconfig(OperationContext* txn, - const ReplSetReconfigArgs& args, - BSONObjBuilder* resultObj) override; + virtual Status processReplSetInitiate(OperationContext* txn, + const BSONObj& configObj, + BSONObjBuilder* resultObj) override; - virtual Status processReplSetInitiate(OperationContext* txn, - const BSONObj& configObj, - BSONObjBuilder* resultObj) override; + virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override; - virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override; + virtual void incrementRollbackID() override; - virtual void incrementRollbackID() override; + virtual Status processReplSetFresh(const ReplSetFreshArgs& args, + BSONObjBuilder* resultObj) override; - virtual Status processReplSetFresh(const ReplSetFreshArgs& args, - BSONObjBuilder* resultObj) override; + virtual Status processReplSetElect(const ReplSetElectArgs& args, + BSONObjBuilder* response) override; - virtual Status processReplSetElect(const ReplSetElectArgs& args, - BSONObjBuilder* response) override; + virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates, + long long* configVersion) override; - virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates, - long long* configVersion) override; + virtual Status processHandshake(OperationContext* txn, const HandshakeArgs& handshake) override; - virtual Status processHandshake(OperationContext* txn, - const HandshakeArgs& handshake) override; + virtual bool buildsIndexes() override; - virtual bool buildsIndexes() override; + virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op) override; - virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op) override; + virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override; - virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override; + virtual WriteConcernOptions getGetLastErrorDefault() override; - virtual WriteConcernOptions getGetLastErrorDefault() override; + virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override; - virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override; + virtual bool isReplEnabled() const override; - virtual bool isReplEnabled() const override; + virtual HostAndPort chooseNewSyncSource() override; - virtual HostAndPort chooseNewSyncSource() override; + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override; - virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override; + virtual void resetLastOpTimeFromOplog(OperationContext* txn) override; - virtual void resetLastOpTimeFromOplog(OperationContext* txn) override; + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override; - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override; + virtual OpTime getLastCommittedOpTime() const override; - virtual OpTime getLastCommittedOpTime() const override; + virtual Status processReplSetRequestVotes(OperationContext* txn, + const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response) override; - virtual Status processReplSetRequestVotes(OperationContext* txn, - const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response) override; + virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args, + long long* responseTerm) override; - virtual Status processReplSetDeclareElectionWinner( - const ReplSetDeclareElectionWinnerArgs& args, - long long* responseTerm) override; + virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder); - virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder); + virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, + ReplSetHeartbeatResponse* response) override; - virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, - ReplSetHeartbeatResponse* response) override; + virtual bool isV1ElectionProtocol() override; - virtual bool isV1ElectionProtocol() override; + virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override; - virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override; + /** + * Get current term from topology coordinator + */ + virtual long long getTerm() override; - /** - * Get current term from topology coordinator - */ - virtual long long getTerm() override; + virtual bool updateTerm(long long term) override; - virtual bool updateTerm(long long term) override; + // ================== Test support API =================== - // ================== Test support API =================== + /** + * If called after startReplication(), blocks until all asynchronous + * activities associated with replication start-up complete. + */ + void waitForStartUpComplete(); - /** - * If called after startReplication(), blocks until all asynchronous - * activities associated with replication start-up complete. - */ - void waitForStartUpComplete(); + /** + * Gets the replica set configuration in use by the node. + */ + ReplicaSetConfig getReplicaSetConfig_forTest(); - /** - * Gets the replica set configuration in use by the node. - */ - ReplicaSetConfig getReplicaSetConfig_forTest(); + /** + * Simple wrapper around _setLastOptime_inlock to make it easier to test. + */ + Status setLastOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime); - /** - * Simple wrapper around _setLastOptime_inlock to make it easier to test. - */ - Status setLastOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime); - - bool updateTerm_forTest(long long term); - - private: - ReplicationCoordinatorImpl(const ReplSettings& settings, - ReplicationCoordinatorExternalState* externalState, - TopologyCoordinator* topCoord, - int64_t prngSeed, - executor::NetworkInterface* network, - StorageInterface* storage, - ReplicationExecutor* replExec); - /** - * Configuration states for a replica set node. - * - * Transition diagram: - * - * PreStart ------------------> ReplicationDisabled - * | - * | - * v - * StartingUp -------> Uninitialized <------> Initiating - * \ ^ | - * ------- | | - * | | | - * v v | - * Reconfig <---> Steady <----> HBReconfig | - * ^ / - * | / - * \ / - * ----------------------- - */ - enum ConfigState { - kConfigPreStart, - kConfigStartingUp, - kConfigReplicationDisabled, - kConfigUninitialized, - kConfigSteady, - kConfigInitiating, - kConfigReconfiguring, - kConfigHBReconfiguring - }; - - /** - * Type describing actions to take after a change to the MemberState _memberState. - */ - enum PostMemberStateUpdateAction { - kActionNone, - kActionCloseAllConnections, // Also indicates that we should clear sharding state. - kActionFollowerModeStateChange, - kActionWinElection - }; - - // Struct that holds information about clients waiting for replication. - struct WaiterInfo; - - // Struct that holds information about nodes in this replication group, mainly used for - // tracking replication progress for write concern satisfaction. - struct SlaveInfo { - OpTime opTime; // Our last known OpTime that this slave has replicated to. - HostAndPort hostAndPort; // Client address of the slave. - int memberId; // Id of the node in the replica set config, or -1 if we're not a replSet. - OID rid; // RID of the node. - bool self; // Whether this SlaveInfo stores the information about ourself - SlaveInfo() : memberId(-1), self(false) {} - }; - - typedef std::vector<SlaveInfo> SlaveInfoVector; - - typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles; - - /** - * Looks up the SlaveInfo in _slaveInfo associated with the given RID and returns a pointer - * to it, or returns NULL if there is no SlaveInfo with the given RID. - */ - SlaveInfo* _findSlaveInfoByRID_inlock(const OID& rid); - - /** - * Looks up the SlaveInfo in _slaveInfo associated with the given member ID and returns a - * pointer to it, or returns NULL if there is no SlaveInfo with the given member ID. - */ - SlaveInfo* _findSlaveInfoByMemberID_inlock(int memberID); - - /** - * Adds the given SlaveInfo to _slaveInfo and wakes up any threads waiting for replication - * that now have their write concern satisfied. Only valid to call in master/slave setups. - */ - void _addSlaveInfo_inlock(const SlaveInfo& slaveInfo); - - /** - * Updates the item in _slaveInfo pointed to by 'slaveInfo' with the given OpTime 'opTime' - * and wakes up any threads waiting for replication that now have their write concern - * satisfied. - */ - void _updateSlaveInfoOptime_inlock(SlaveInfo* slaveInfo, const OpTime& opTime); - - /** - * Returns the index into _slaveInfo where data corresponding to ourself is stored. - * For more info on the rules about how we know where our entry is, see the comment for - * _slaveInfo. - */ - size_t _getMyIndexInSlaveInfo_inlock() const; - - /** - * Helper method that removes entries from _slaveInfo if they correspond to a node - * with a member ID that is not in the current replica set config. Will always leave an - * entry for ourself at the beginning of _slaveInfo, even if we aren't present in the - * config. - */ - void _updateSlaveInfoFromConfig_inlock(); - - /** - * Helper to update our saved config, cancel any pending heartbeats, and kick off sending - * new heartbeats based on the new config. Must *only* be called from within the - * ReplicationExecutor context. - * - * Returns an action to be performed after unlocking _mutex, via - * _performPostMemberStateUpdateAction. - */ - PostMemberStateUpdateAction _setCurrentRSConfig_inlock( - const ReplicaSetConfig& newConfig, - int myIndex); - - /** - * Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication. - */ - void _wakeReadyWaiters_inlock(); - - /** - * Helper method for setting/unsetting maintenance mode. Scheduled by setMaintenanceMode() - * to run in a global write lock in the replication executor thread. - */ - void _setMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData, - bool activate, - Status* result); - - /** - * Helper method for retrieving maintenance mode. Scheduled by getMaintenanceMode() to run - * in the replication executor thread. - */ - void _getMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData, - bool* maintenanceMode); - - /** - * Bottom half of fillIsMasterForReplSet. - */ - void _fillIsMasterForReplSet_finish(const ReplicationExecutor::CallbackArgs& cbData, - IsMasterResponse* result); - - /** - * Bottom half of processReplSetFresh. - */ - void _processReplSetFresh_finish(const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetFreshArgs& args, - BSONObjBuilder* response, - Status* result); - - /** - * Bottom half of processReplSetElect. - */ - void _processReplSetElect_finish(const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetElectArgs& args, - BSONObjBuilder* response, - Status* result); - - /** - * Bottom half of processReplSetFreeze. - */ - void _processReplSetFreeze_finish(const ReplicationExecutor::CallbackArgs& cbData, - int secs, - BSONObjBuilder* response, - Status* result); - /* - * Bottom half of clearSyncSourceBlacklist - */ - void _clearSyncSourceBlacklist_finish(const ReplicationExecutor::CallbackArgs& cbData); - - /** - * Bottom half of processReplSetDeclareElectionWinner. - */ - void _processReplSetDeclareElectionWinner_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetDeclareElectionWinnerArgs& args, - long long* responseTerm, - Status* result); - - /** - * Bottom half of processReplSetRequestVotes. - */ - void _processReplSetRequestVotes_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response, - Status* result); - - /** - * Scheduled to cause the ReplicationCoordinator to reconsider any state that might - * need to change as a result of time passing - for instance becoming PRIMARY when a single - * node replica set member's stepDown period ends. - */ - void _handleTimePassing(const ReplicationExecutor::CallbackArgs& cbData); - - /** - * Helper method for _awaitReplication that takes an already locked unique_lock and a - * Timer for timing the operation which has been counting since before the lock was - * acquired. - */ - ReplicationCoordinator::StatusAndDuration _awaitReplication_inlock( - const Timer* timer, - stdx::unique_lock<stdx::mutex>* lock, - OperationContext* txn, - const OpTime& opTime, - const WriteConcernOptions& writeConcern); - - /* - * Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable. - */ - bool _doneWaitingForReplication_inlock(const OpTime& opTime, - const WriteConcernOptions& writeConcern); - - /** - * Helper for _doneWaitingForReplication_inlock that takes an integer write concern. - */ - bool _haveNumNodesReachedOpTime_inlock(const OpTime& opTime, int numNodes); - - /** - * Helper for _doneWaitingForReplication_inlock that takes a tag pattern representing a - * named write concern mode. - */ - bool _haveTaggedNodesReachedOpTime_inlock(const OpTime& opTime, - const ReplicaSetTagPattern& tagPattern); - - Status _checkIfWriteConcernCanBeSatisfied_inlock( - const WriteConcernOptions& writeConcern) const; - - /** - * Triggers all callbacks that are blocked waiting for new heartbeat data - * to decide whether or not to finish a step down. - * Should only be called from executor callbacks. - */ - void _signalStepDownWaitersFromCallback(const ReplicationExecutor::CallbackArgs& cbData); - void _signalStepDownWaiters(); - - /** - * Helper for stepDown run within a ReplicationExecutor callback. This method assumes - * it is running within a global shared lock, and thus that no writes are going on at the - * same time. - */ - void _stepDownContinue(const ReplicationExecutor::CallbackArgs& cbData, - const ReplicationExecutor::EventHandle finishedEvent, - OperationContext* txn, - Date_t waitUntil, - Date_t stepdownUntil, - bool force, - Status* result); - - OID _getMyRID_inlock() const; - - int _getMyId_inlock() const; - - OpTime _getMyLastOptime_inlock() const; - - /** - * Bottom half of setFollowerMode. - * - * May reschedule itself after the current election, so it is not sufficient to - * wait for a callback scheduled to execute this method to complete. Instead, - * supply an event, "finishedSettingFollowerMode", and wait for that event to - * be signaled. Do not observe "*success" until after the event is signaled. - */ - void _setFollowerModeFinish( - const ReplicationExecutor::CallbackArgs& cbData, - const MemberState& newState, - const ReplicationExecutor::EventHandle& finishedSettingFollowerMode, - bool* success); - - /** - * Helper method for updating our tracking of the last optime applied by a given node. - * This is only valid to call on replica sets. - * "configVersion" will be populated with our config version if it and the configVersion - * of "args" differ. - */ - Status _setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args, - long long* configVersion); - - /** - * Helper method for setMyLastOptime that takes in a unique lock on - * _mutex. The passed in lock must already be locked. It is unspecified what state the - * lock will be in after this method finishes. - * - * This function has the same rules for "opTime" as setMyLastOptime(), unless - * "isRollbackAllowed" is true. - */ - void _setMyLastOptime_inlock(stdx::unique_lock<stdx::mutex>* lock, - const OpTime& opTime, - bool isRollbackAllowed); - - /** - * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index - * into the replica set config members array that corresponds to the "target", or -1 if - * "target" is not in _rsConfig. - */ - void _scheduleHeartbeatToTarget(const HostAndPort& target, int targetIndex, Date_t when); - - /** - * Processes each heartbeat response. - * - * Schedules additional heartbeats, triggers elections and step downs, etc. - */ - void _handleHeartbeatResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, - int targetIndex); - - void _handleHeartbeatResponseV1( - const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, - int targetIndex); - - void _trackHeartbeatHandle(const StatusWith<ReplicationExecutor::CallbackHandle>& handle); - - void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle); - - /** - * Helper for _handleHeartbeatResponse. - * - * Updates the optime associated with the member at "memberIndex" in our config. - */ - void _updateOpTimeFromHeartbeat_inlock(int memberIndex, const OpTime& optime); - - /** - * Starts a heartbeat for each member in the current config. Called within the executor - * context. - */ - void _startHeartbeats(); - - /** - * Cancels all heartbeats. Called within executor context. - */ - void _cancelHeartbeats(); - - /** - * Asynchronously sends a heartbeat to "target". "targetIndex" is the index - * into the replica set config members array that corresponds to the "target", or -1 if - * we don't have a valid replica set config. - * - * Scheduled by _scheduleHeartbeatToTarget. - */ - void _doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, - const HostAndPort& target, - int targetIndex); - - - MemberState _getMemberState_inlock() const; - - /** - * Callback that gives the TopologyCoordinator an initial LastVote document from - * local storage. - * - * Called only during replication startup. All other updates come from the - * TopologyCoordinator itself. - */ - void _updateLastVote(const LastVote& lastVote); - - /** - * Starts loading the replication configuration from local storage, and if it is valid, - * schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set - * config (sets _rsConfig and _thisMembersConfigIndex). - * Returns true if it finishes loading the local config, which most likely means there - * was no local config at all or it was invalid in some way, and false if there was a valid - * config detected but more work is needed to set it as the local config (which will be - * handled by the callback to _finishLoadLocalConfig). - */ - bool _startLoadLocalConfig(OperationContext* txn); - - /** - * Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState - * to kConfigSteady, so that we can begin processing heartbeats and reconfigs. - */ - void _finishLoadLocalConfig(const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& localConfig, - const StatusWith<OpTime>& lastOpTimeStatus); - - /** - * Callback that finishes the work of processReplSetInitiate() inside the replication - * executor context, in the event of a successful quorum check. - */ - void _finishReplSetInitiate( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig, - int myIndex); - - /** - * Callback that finishes the work of processReplSetReconfig inside the replication - * executor context, in the event of a successful quorum check. - */ - void _finishReplSetReconfig( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig, - int myIndex); - - /** - * Changes _rsConfigState to newState, and notify any waiters. - */ - void _setConfigState_inlock(ConfigState newState); - - /** - * Updates the cached value, _memberState, to match _topCoord's reported - * member state, from getMemberState(). - * - * Returns an enum indicating what action to take after releasing _mutex, if any. - * Call performPostMemberStateUpdateAction on the return value after releasing - * _mutex. - */ - PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock(); - - /** - * Performs a post member-state update action. Do not call while holding _mutex. - */ - void _performPostMemberStateUpdateAction(PostMemberStateUpdateAction action); - - /** - * Begins an attempt to elect this node. - * Called after an incoming heartbeat changes this node's view of the set such that it - * believes it can be elected PRIMARY. - * For proper concurrency, must be called via a ReplicationExecutor callback. - * - * For old style elections the election path is: - * _startElectSelf() - * _onFreshnessCheckComplete() - * _onElectCmdRunnerComplete() - * For V1 (raft) style elections the election path is: - * _startElectSelfV1() - * _onDryRunComplete() - * _onVoteRequestComplete() - * _onElectionWinnerDeclarerComplete() - */ - void _startElectSelf(); - void _startElectSelfV1(); - - /** - * Callback called when the FreshnessChecker has completed; checks the results and - * decides whether to continue election proceedings. - **/ - void _onFreshnessCheckComplete(); - - /** - * Callback called when the ElectCmdRunner has completed; checks the results and - * decides whether to complete the election and change state to primary. - **/ - void _onElectCmdRunnerComplete(); - - /** - * Callback called when the dryRun VoteRequester has completed; checks the results and - * decides whether to conduct a proper election. - * "originalTerm" was the term during which the dry run began, if the term has since - * changed, do not run for election. - */ - void _onDryRunComplete(long long originalTerm); - - /** - * Callback called when the VoteRequester has completed; checks the results and - * decides whether to change state to primary and alert other nodes of our primary-ness. - * "originalTerm" was the term during which the election began, if the term has since - * changed, do not step up as primary. - */ - void _onVoteRequestComplete(long long originalTerm); - - /** - * Callback called when the ElectWinnerDeclarer has completed; checks the results and - * if we received any negative responses, relinquish primary. - */ - void _onElectionWinnerDeclarerComplete(); - - /** - * Callback called after a random delay, to prevent repeated election ties. - */ - void _recoverFromElectionTie(const ReplicationExecutor::CallbackArgs& cbData); - - /** - * Chooses a new sync source. Must be scheduled as a callback. - * - * Calls into the Topology Coordinator, which uses its current view of the set to choose - * the most appropriate sync source. - */ - void _chooseNewSyncSource(const ReplicationExecutor::CallbackArgs& cbData, - HostAndPort* newSyncSource); - - /** - * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot - * be chosen as a sync source. Schedules a callback to unblacklist the sync source to be - * run at 'until'. - * - * Must be scheduled as a callback. - */ - void _blacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData, - const HostAndPort& host, - Date_t until); - - /** - * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply - * ignored and no error is thrown. - * - * Must be scheduled as a callback. - */ - void _unblacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData, - const HostAndPort& host); - - /** - * Determines if a new sync source should be considered. - * - * Must be scheduled as a callback. - */ - void _shouldChangeSyncSource(const ReplicationExecutor::CallbackArgs& cbData, - const HostAndPort& currentSource, - bool* shouldChange); - - /** - * Schedules a request that the given host step down; logs any errors. - */ - void _requestRemotePrimaryStepdown(const HostAndPort& target); - - void _heartbeatStepDownStart(); - - /** - * Completes a step-down of the current node. Must be run with a global - * shared or global exclusive lock. - */ - void _stepDownFinish(const ReplicationExecutor::CallbackArgs& cbData); - - /** - * Schedules a replica set config change. - */ - void _scheduleHeartbeatReconfig(const ReplicaSetConfig& newConfig); - - /** - * Callback that continues a heartbeat-initiated reconfig after a running election - * completes. - */ - void _heartbeatReconfigAfterElectionCanceled( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig); - - /** - * Method to write a configuration transmitted via heartbeat message to stable storage. - */ - void _heartbeatReconfigStore(const ReplicationExecutor::CallbackArgs& cbd, - const ReplicaSetConfig& newConfig); - - /** - * Conclusion actions of a heartbeat-triggered reconfiguration. - */ - void _heartbeatReconfigFinish(const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig, - StatusWith<int> myIndex); - - /** - * Utility method that schedules or performs actions specified by a HeartbeatResponseAction - * returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given - * value of "responseStatus". - */ - void _handleHeartbeatResponseAction( - const HeartbeatResponseAction& action, - const StatusWith<ReplSetHeartbeatResponse>& responseStatus); - - /** - * Bottom half of processHeartbeat(), which runs in the replication executor. - */ - void _processHeartbeatFinish(const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response, - Status* outStatus); - - /** - * Bottom half of processHeartbeatV1(), which runs in the replication executor. - */ - void _processHeartbeatFinishV1(const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetHeartbeatArgsV1& args, - ReplSetHeartbeatResponse* response, - Status* outStatus); - /** - * Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of - * servers; set _lastCommittedOpTime to this new entry, if greater than the current entry. - */ - void _updateLastCommittedOpTime_inlock(); - - void _summarizeAsHtml_finish(const ReplicationExecutor::CallbackArgs& cbData, - ReplSetHtmlSummary* output); - - /** - * Callback that gets the current term from topology coordinator. - */ - void _getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, long long* term); - - - /** - * Callback that attempts to set the current term in topology coordinator and - * relinquishes primary if the term actually changes and we are primary. - */ - void _updateTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, - long long term, - bool* updated, - Handle* cbHandle); - bool _updateTerm_incallback(long long term, Handle* cbHandle); - - // - // All member variables are labeled with one of the following codes indicating the - // synchronization rules for accessing them. - // - // (R) Read-only in concurrent operation; no synchronization required. - // (S) Self-synchronizing; access in any way from any context. - // (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing; - // Access in any context. - // (M) Reads and writes guarded by _mutex - // (X) Reads and writes must be performed in a callback in _replExecutor - // (MX) Must hold _mutex and be in a callback in _replExecutor to write; must either hold - // _mutex or be in a callback in _replExecutor to read. - // (GX) Readable under a global intent lock. Must either hold global lock in exclusive - // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and be in executor - // context to write. - // (I) Independently synchronized, see member variable comment. - - // Protects member data of this ReplicationCoordinator. - mutable stdx::mutex _mutex; // (S) - - // Handles to actively queued heartbeats. - HeartbeatHandles _heartbeatHandles; // (X) - - // When this node does not know itself to be a member of a config, it adds - // every host that sends it a heartbeat request to this set, and also starts - // sending heartbeat requests to that host. This set is cleared whenever - // a node discovers that it is a member of a config. - unordered_set<HostAndPort> _seedList; // (X) - - // Parsed command line arguments related to replication. - const ReplSettings _settings; // (R) - - // Mode of replication specified by _settings. - const Mode _replMode; // (R) - - // Pointer to the TopologyCoordinator owned by this ReplicationCoordinator. - std::unique_ptr<TopologyCoordinator> _topCoord; // (X) - - // If the executer is owned then this will be set, but should not be used. - // This is only used to clean up and destroy the replExec if owned - std::unique_ptr<ReplicationExecutor> _replExecutorIfOwned; // (S) - // Executor that drives the topology coordinator. - ReplicationExecutor& _replExecutor; // (S) - - // Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator. - std::unique_ptr<ReplicationCoordinatorExternalState> _externalState; // (PS) - - // Thread that drives actions in the topology coordinator - // Set in startReplication() and thereafter accessed in shutdown. - std::unique_ptr<stdx::thread> _topCoordDriverThread; // (I) - - // Our RID, used to identify us to our sync source when sending replication progress - // updates upstream. Set once in startReplication() and then never modified again. - OID _myRID; // (M) - - // Rollback ID. Used to check if a rollback happened during some interval of time - // TODO: ideally this should only change on rollbacks NOT on mongod restarts also. - int _rbid; // (M) - - // list of information about clients waiting on replication. Does *not* own the - // WaiterInfos. - std::vector<WaiterInfo*> _replicationWaiterList; // (M) - - // list of information about clients waiting for a particular opTime. - // Does *not* own the WaiterInfos. - std::vector<WaiterInfo*> _opTimeWaiterList; // (M) - - // Set to true when we are in the process of shutting down replication. - bool _inShutdown; // (M) - - // Election ID of the last election that resulted in this node becoming primary. - OID _electionId; // (M) - - // Vector containing known information about each member (such as replication - // progress and member ID) in our replica set or each member replicating from - // us in a master-slave deployment. In master/slave, the first entry is - // guaranteed to correspond to ourself. In replica sets where we don't have a - // valid config or are in state REMOVED then the vector will be a single element - // just with info about ourself. In replica sets with a valid config the elements - // will be in the same order as the members in the replica set config, thus - // the entry for ourself will be at _thisMemberConfigIndex. - SlaveInfoVector _slaveInfo; // (M) - - // Current ReplicaSet state. - MemberState _memberState; // (MX) - - // True if we are waiting for the applier to finish draining. - bool _isWaitingForDrainToComplete; // (M) - - // Used to signal threads waiting for changes to _rsConfigState. - stdx::condition_variable _rsConfigStateChange; // (M) - - // Represents the configuration state of the coordinator, which controls how and when - // _rsConfig may change. See the state transition diagram in the type definition of - // ConfigState for details. - ConfigState _rsConfigState; // (M) - - // The current ReplicaSet configuration object, including the information about tag groups - // that is used to satisfy write concern requests with named gle modes. - ReplicaSetConfig _rsConfig; // (MX) - - // This member's index position in the current config. - int _selfIndex; // (MX) - - // Vector of events that should be signaled whenever new heartbeat data comes in. - std::vector<ReplicationExecutor::EventHandle> _stepDownWaiters; // (X) + bool updateTerm_forTest(long long term); - // State for conducting an election of this node. - // the presence of a non-null _freshnessChecker pointer indicates that an election is - // currently in progress. When using the V1 protocol, a non-null _voteRequester pointer - // indicates this instead. - // Only one election is allowed at a time. - std::unique_ptr<FreshnessChecker> _freshnessChecker; // (X) +private: + ReplicationCoordinatorImpl(const ReplSettings& settings, + ReplicationCoordinatorExternalState* externalState, + TopologyCoordinator* topCoord, + int64_t prngSeed, + executor::NetworkInterface* network, + StorageInterface* storage, + ReplicationExecutor* replExec); + /** + * Configuration states for a replica set node. + * + * Transition diagram: + * + * PreStart ------------------> ReplicationDisabled + * | + * | + * v + * StartingUp -------> Uninitialized <------> Initiating + * \ ^ | + * ------- | | + * | | | + * v v | + * Reconfig <---> Steady <----> HBReconfig | + * ^ / + * | / + * \ / + * ----------------------- + */ + enum ConfigState { + kConfigPreStart, + kConfigStartingUp, + kConfigReplicationDisabled, + kConfigUninitialized, + kConfigSteady, + kConfigInitiating, + kConfigReconfiguring, + kConfigHBReconfiguring + }; + + /** + * Type describing actions to take after a change to the MemberState _memberState. + */ + enum PostMemberStateUpdateAction { + kActionNone, + kActionCloseAllConnections, // Also indicates that we should clear sharding state. + kActionFollowerModeStateChange, + kActionWinElection + }; - std::unique_ptr<ElectCmdRunner> _electCmdRunner; // (X) + // Struct that holds information about clients waiting for replication. + struct WaiterInfo; + + // Struct that holds information about nodes in this replication group, mainly used for + // tracking replication progress for write concern satisfaction. + struct SlaveInfo { + OpTime opTime; // Our last known OpTime that this slave has replicated to. + HostAndPort hostAndPort; // Client address of the slave. + int memberId; // Id of the node in the replica set config, or -1 if we're not a replSet. + OID rid; // RID of the node. + bool self; // Whether this SlaveInfo stores the information about ourself + SlaveInfo() : memberId(-1), self(false) {} + }; - std::unique_ptr<VoteRequester> _voteRequester; // (X) + typedef std::vector<SlaveInfo> SlaveInfoVector; + + typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles; + + /** + * Looks up the SlaveInfo in _slaveInfo associated with the given RID and returns a pointer + * to it, or returns NULL if there is no SlaveInfo with the given RID. + */ + SlaveInfo* _findSlaveInfoByRID_inlock(const OID& rid); + + /** + * Looks up the SlaveInfo in _slaveInfo associated with the given member ID and returns a + * pointer to it, or returns NULL if there is no SlaveInfo with the given member ID. + */ + SlaveInfo* _findSlaveInfoByMemberID_inlock(int memberID); + + /** + * Adds the given SlaveInfo to _slaveInfo and wakes up any threads waiting for replication + * that now have their write concern satisfied. Only valid to call in master/slave setups. + */ + void _addSlaveInfo_inlock(const SlaveInfo& slaveInfo); + + /** + * Updates the item in _slaveInfo pointed to by 'slaveInfo' with the given OpTime 'opTime' + * and wakes up any threads waiting for replication that now have their write concern + * satisfied. + */ + void _updateSlaveInfoOptime_inlock(SlaveInfo* slaveInfo, const OpTime& opTime); + + /** + * Returns the index into _slaveInfo where data corresponding to ourself is stored. + * For more info on the rules about how we know where our entry is, see the comment for + * _slaveInfo. + */ + size_t _getMyIndexInSlaveInfo_inlock() const; + + /** + * Helper method that removes entries from _slaveInfo if they correspond to a node + * with a member ID that is not in the current replica set config. Will always leave an + * entry for ourself at the beginning of _slaveInfo, even if we aren't present in the + * config. + */ + void _updateSlaveInfoFromConfig_inlock(); + + /** + * Helper to update our saved config, cancel any pending heartbeats, and kick off sending + * new heartbeats based on the new config. Must *only* be called from within the + * ReplicationExecutor context. + * + * Returns an action to be performed after unlocking _mutex, via + * _performPostMemberStateUpdateAction. + */ + PostMemberStateUpdateAction _setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig, + int myIndex); + + /** + * Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication. + */ + void _wakeReadyWaiters_inlock(); + + /** + * Helper method for setting/unsetting maintenance mode. Scheduled by setMaintenanceMode() + * to run in a global write lock in the replication executor thread. + */ + void _setMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData, + bool activate, + Status* result); + + /** + * Helper method for retrieving maintenance mode. Scheduled by getMaintenanceMode() to run + * in the replication executor thread. + */ + void _getMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData, + bool* maintenanceMode); + + /** + * Bottom half of fillIsMasterForReplSet. + */ + void _fillIsMasterForReplSet_finish(const ReplicationExecutor::CallbackArgs& cbData, + IsMasterResponse* result); + + /** + * Bottom half of processReplSetFresh. + */ + void _processReplSetFresh_finish(const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetFreshArgs& args, + BSONObjBuilder* response, + Status* result); + + /** + * Bottom half of processReplSetElect. + */ + void _processReplSetElect_finish(const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetElectArgs& args, + BSONObjBuilder* response, + Status* result); + + /** + * Bottom half of processReplSetFreeze. + */ + void _processReplSetFreeze_finish(const ReplicationExecutor::CallbackArgs& cbData, + int secs, + BSONObjBuilder* response, + Status* result); + /* + * Bottom half of clearSyncSourceBlacklist + */ + void _clearSyncSourceBlacklist_finish(const ReplicationExecutor::CallbackArgs& cbData); + + /** + * Bottom half of processReplSetDeclareElectionWinner. + */ + void _processReplSetDeclareElectionWinner_finish( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetDeclareElectionWinnerArgs& args, + long long* responseTerm, + Status* result); + + /** + * Bottom half of processReplSetRequestVotes. + */ + void _processReplSetRequestVotes_finish(const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response, + Status* result); + + /** + * Scheduled to cause the ReplicationCoordinator to reconsider any state that might + * need to change as a result of time passing - for instance becoming PRIMARY when a single + * node replica set member's stepDown period ends. + */ + void _handleTimePassing(const ReplicationExecutor::CallbackArgs& cbData); + + /** + * Helper method for _awaitReplication that takes an already locked unique_lock and a + * Timer for timing the operation which has been counting since before the lock was + * acquired. + */ + ReplicationCoordinator::StatusAndDuration _awaitReplication_inlock( + const Timer* timer, + stdx::unique_lock<stdx::mutex>* lock, + OperationContext* txn, + const OpTime& opTime, + const WriteConcernOptions& writeConcern); + + /* + * Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable. + */ + bool _doneWaitingForReplication_inlock(const OpTime& opTime, + const WriteConcernOptions& writeConcern); + + /** + * Helper for _doneWaitingForReplication_inlock that takes an integer write concern. + */ + bool _haveNumNodesReachedOpTime_inlock(const OpTime& opTime, int numNodes); + + /** + * Helper for _doneWaitingForReplication_inlock that takes a tag pattern representing a + * named write concern mode. + */ + bool _haveTaggedNodesReachedOpTime_inlock(const OpTime& opTime, + const ReplicaSetTagPattern& tagPattern); + + Status _checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions& writeConcern) const; + + /** + * Triggers all callbacks that are blocked waiting for new heartbeat data + * to decide whether or not to finish a step down. + * Should only be called from executor callbacks. + */ + void _signalStepDownWaitersFromCallback(const ReplicationExecutor::CallbackArgs& cbData); + void _signalStepDownWaiters(); + + /** + * Helper for stepDown run within a ReplicationExecutor callback. This method assumes + * it is running within a global shared lock, and thus that no writes are going on at the + * same time. + */ + void _stepDownContinue(const ReplicationExecutor::CallbackArgs& cbData, + const ReplicationExecutor::EventHandle finishedEvent, + OperationContext* txn, + Date_t waitUntil, + Date_t stepdownUntil, + bool force, + Status* result); + + OID _getMyRID_inlock() const; + + int _getMyId_inlock() const; + + OpTime _getMyLastOptime_inlock() const; + + /** + * Bottom half of setFollowerMode. + * + * May reschedule itself after the current election, so it is not sufficient to + * wait for a callback scheduled to execute this method to complete. Instead, + * supply an event, "finishedSettingFollowerMode", and wait for that event to + * be signaled. Do not observe "*success" until after the event is signaled. + */ + void _setFollowerModeFinish(const ReplicationExecutor::CallbackArgs& cbData, + const MemberState& newState, + const ReplicationExecutor::EventHandle& finishedSettingFollowerMode, + bool* success); + + /** + * Helper method for updating our tracking of the last optime applied by a given node. + * This is only valid to call on replica sets. + * "configVersion" will be populated with our config version if it and the configVersion + * of "args" differ. + */ + Status _setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args, + long long* configVersion); + + /** + * Helper method for setMyLastOptime that takes in a unique lock on + * _mutex. The passed in lock must already be locked. It is unspecified what state the + * lock will be in after this method finishes. + * + * This function has the same rules for "opTime" as setMyLastOptime(), unless + * "isRollbackAllowed" is true. + */ + void _setMyLastOptime_inlock(stdx::unique_lock<stdx::mutex>* lock, + const OpTime& opTime, + bool isRollbackAllowed); + + /** + * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index + * into the replica set config members array that corresponds to the "target", or -1 if + * "target" is not in _rsConfig. + */ + void _scheduleHeartbeatToTarget(const HostAndPort& target, int targetIndex, Date_t when); + + /** + * Processes each heartbeat response. + * + * Schedules additional heartbeats, triggers elections and step downs, etc. + */ + void _handleHeartbeatResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, + int targetIndex); + + void _handleHeartbeatResponseV1(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, + int targetIndex); + + void _trackHeartbeatHandle(const StatusWith<ReplicationExecutor::CallbackHandle>& handle); + + void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle); + + /** + * Helper for _handleHeartbeatResponse. + * + * Updates the optime associated with the member at "memberIndex" in our config. + */ + void _updateOpTimeFromHeartbeat_inlock(int memberIndex, const OpTime& optime); + + /** + * Starts a heartbeat for each member in the current config. Called within the executor + * context. + */ + void _startHeartbeats(); + + /** + * Cancels all heartbeats. Called within executor context. + */ + void _cancelHeartbeats(); + + /** + * Asynchronously sends a heartbeat to "target". "targetIndex" is the index + * into the replica set config members array that corresponds to the "target", or -1 if + * we don't have a valid replica set config. + * + * Scheduled by _scheduleHeartbeatToTarget. + */ + void _doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, + const HostAndPort& target, + int targetIndex); + + + MemberState _getMemberState_inlock() const; + + /** + * Callback that gives the TopologyCoordinator an initial LastVote document from + * local storage. + * + * Called only during replication startup. All other updates come from the + * TopologyCoordinator itself. + */ + void _updateLastVote(const LastVote& lastVote); + + /** + * Starts loading the replication configuration from local storage, and if it is valid, + * schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set + * config (sets _rsConfig and _thisMembersConfigIndex). + * Returns true if it finishes loading the local config, which most likely means there + * was no local config at all or it was invalid in some way, and false if there was a valid + * config detected but more work is needed to set it as the local config (which will be + * handled by the callback to _finishLoadLocalConfig). + */ + bool _startLoadLocalConfig(OperationContext* txn); + + /** + * Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState + * to kConfigSteady, so that we can begin processing heartbeats and reconfigs. + */ + void _finishLoadLocalConfig(const ReplicationExecutor::CallbackArgs& cbData, + const ReplicaSetConfig& localConfig, + const StatusWith<OpTime>& lastOpTimeStatus); + + /** + * Callback that finishes the work of processReplSetInitiate() inside the replication + * executor context, in the event of a successful quorum check. + */ + void _finishReplSetInitiate(const ReplicationExecutor::CallbackArgs& cbData, + const ReplicaSetConfig& newConfig, + int myIndex); + + /** + * Callback that finishes the work of processReplSetReconfig inside the replication + * executor context, in the event of a successful quorum check. + */ + void _finishReplSetReconfig(const ReplicationExecutor::CallbackArgs& cbData, + const ReplicaSetConfig& newConfig, + int myIndex); + + /** + * Changes _rsConfigState to newState, and notify any waiters. + */ + void _setConfigState_inlock(ConfigState newState); + + /** + * Updates the cached value, _memberState, to match _topCoord's reported + * member state, from getMemberState(). + * + * Returns an enum indicating what action to take after releasing _mutex, if any. + * Call performPostMemberStateUpdateAction on the return value after releasing + * _mutex. + */ + PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock(); + + /** + * Performs a post member-state update action. Do not call while holding _mutex. + */ + void _performPostMemberStateUpdateAction(PostMemberStateUpdateAction action); + + /** + * Begins an attempt to elect this node. + * Called after an incoming heartbeat changes this node's view of the set such that it + * believes it can be elected PRIMARY. + * For proper concurrency, must be called via a ReplicationExecutor callback. + * + * For old style elections the election path is: + * _startElectSelf() + * _onFreshnessCheckComplete() + * _onElectCmdRunnerComplete() + * For V1 (raft) style elections the election path is: + * _startElectSelfV1() + * _onDryRunComplete() + * _onVoteRequestComplete() + * _onElectionWinnerDeclarerComplete() + */ + void _startElectSelf(); + void _startElectSelfV1(); + + /** + * Callback called when the FreshnessChecker has completed; checks the results and + * decides whether to continue election proceedings. + **/ + void _onFreshnessCheckComplete(); + + /** + * Callback called when the ElectCmdRunner has completed; checks the results and + * decides whether to complete the election and change state to primary. + **/ + void _onElectCmdRunnerComplete(); + + /** + * Callback called when the dryRun VoteRequester has completed; checks the results and + * decides whether to conduct a proper election. + * "originalTerm" was the term during which the dry run began, if the term has since + * changed, do not run for election. + */ + void _onDryRunComplete(long long originalTerm); + + /** + * Callback called when the VoteRequester has completed; checks the results and + * decides whether to change state to primary and alert other nodes of our primary-ness. + * "originalTerm" was the term during which the election began, if the term has since + * changed, do not step up as primary. + */ + void _onVoteRequestComplete(long long originalTerm); + + /** + * Callback called when the ElectWinnerDeclarer has completed; checks the results and + * if we received any negative responses, relinquish primary. + */ + void _onElectionWinnerDeclarerComplete(); + + /** + * Callback called after a random delay, to prevent repeated election ties. + */ + void _recoverFromElectionTie(const ReplicationExecutor::CallbackArgs& cbData); + + /** + * Chooses a new sync source. Must be scheduled as a callback. + * + * Calls into the Topology Coordinator, which uses its current view of the set to choose + * the most appropriate sync source. + */ + void _chooseNewSyncSource(const ReplicationExecutor::CallbackArgs& cbData, + HostAndPort* newSyncSource); + + /** + * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot + * be chosen as a sync source. Schedules a callback to unblacklist the sync source to be + * run at 'until'. + * + * Must be scheduled as a callback. + */ + void _blacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData, + const HostAndPort& host, + Date_t until); + + /** + * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply + * ignored and no error is thrown. + * + * Must be scheduled as a callback. + */ + void _unblacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData, + const HostAndPort& host); + + /** + * Determines if a new sync source should be considered. + * + * Must be scheduled as a callback. + */ + void _shouldChangeSyncSource(const ReplicationExecutor::CallbackArgs& cbData, + const HostAndPort& currentSource, + bool* shouldChange); + + /** + * Schedules a request that the given host step down; logs any errors. + */ + void _requestRemotePrimaryStepdown(const HostAndPort& target); + + void _heartbeatStepDownStart(); + + /** + * Completes a step-down of the current node. Must be run with a global + * shared or global exclusive lock. + */ + void _stepDownFinish(const ReplicationExecutor::CallbackArgs& cbData); + + /** + * Schedules a replica set config change. + */ + void _scheduleHeartbeatReconfig(const ReplicaSetConfig& newConfig); + + /** + * Callback that continues a heartbeat-initiated reconfig after a running election + * completes. + */ + void _heartbeatReconfigAfterElectionCanceled(const ReplicationExecutor::CallbackArgs& cbData, + const ReplicaSetConfig& newConfig); + + /** + * Method to write a configuration transmitted via heartbeat message to stable storage. + */ + void _heartbeatReconfigStore(const ReplicationExecutor::CallbackArgs& cbd, + const ReplicaSetConfig& newConfig); + + /** + * Conclusion actions of a heartbeat-triggered reconfiguration. + */ + void _heartbeatReconfigFinish(const ReplicationExecutor::CallbackArgs& cbData, + const ReplicaSetConfig& newConfig, + StatusWith<int> myIndex); + + /** + * Utility method that schedules or performs actions specified by a HeartbeatResponseAction + * returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given + * value of "responseStatus". + */ + void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action, + const StatusWith<ReplSetHeartbeatResponse>& responseStatus); + + /** + * Bottom half of processHeartbeat(), which runs in the replication executor. + */ + void _processHeartbeatFinish(const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetHeartbeatArgs& args, + ReplSetHeartbeatResponse* response, + Status* outStatus); + + /** + * Bottom half of processHeartbeatV1(), which runs in the replication executor. + */ + void _processHeartbeatFinishV1(const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetHeartbeatArgsV1& args, + ReplSetHeartbeatResponse* response, + Status* outStatus); + /** + * Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of + * servers; set _lastCommittedOpTime to this new entry, if greater than the current entry. + */ + void _updateLastCommittedOpTime_inlock(); + + void _summarizeAsHtml_finish(const ReplicationExecutor::CallbackArgs& cbData, + ReplSetHtmlSummary* output); + + /** + * Callback that gets the current term from topology coordinator. + */ + void _getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, long long* term); + + + /** + * Callback that attempts to set the current term in topology coordinator and + * relinquishes primary if the term actually changes and we are primary. + */ + void _updateTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, + long long term, + bool* updated, + Handle* cbHandle); + bool _updateTerm_incallback(long long term, Handle* cbHandle); + + // + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (R) Read-only in concurrent operation; no synchronization required. + // (S) Self-synchronizing; access in any way from any context. + // (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing; + // Access in any context. + // (M) Reads and writes guarded by _mutex + // (X) Reads and writes must be performed in a callback in _replExecutor + // (MX) Must hold _mutex and be in a callback in _replExecutor to write; must either hold + // _mutex or be in a callback in _replExecutor to read. + // (GX) Readable under a global intent lock. Must either hold global lock in exclusive + // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and be in executor + // context to write. + // (I) Independently synchronized, see member variable comment. + + // Protects member data of this ReplicationCoordinator. + mutable stdx::mutex _mutex; // (S) + + // Handles to actively queued heartbeats. + HeartbeatHandles _heartbeatHandles; // (X) + + // When this node does not know itself to be a member of a config, it adds + // every host that sends it a heartbeat request to this set, and also starts + // sending heartbeat requests to that host. This set is cleared whenever + // a node discovers that it is a member of a config. + unordered_set<HostAndPort> _seedList; // (X) + + // Parsed command line arguments related to replication. + const ReplSettings _settings; // (R) + + // Mode of replication specified by _settings. + const Mode _replMode; // (R) + + // Pointer to the TopologyCoordinator owned by this ReplicationCoordinator. + std::unique_ptr<TopologyCoordinator> _topCoord; // (X) + + // If the executer is owned then this will be set, but should not be used. + // This is only used to clean up and destroy the replExec if owned + std::unique_ptr<ReplicationExecutor> _replExecutorIfOwned; // (S) + // Executor that drives the topology coordinator. + ReplicationExecutor& _replExecutor; // (S) + + // Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator. + std::unique_ptr<ReplicationCoordinatorExternalState> _externalState; // (PS) + + // Thread that drives actions in the topology coordinator + // Set in startReplication() and thereafter accessed in shutdown. + std::unique_ptr<stdx::thread> _topCoordDriverThread; // (I) + + // Our RID, used to identify us to our sync source when sending replication progress + // updates upstream. Set once in startReplication() and then never modified again. + OID _myRID; // (M) + + // Rollback ID. Used to check if a rollback happened during some interval of time + // TODO: ideally this should only change on rollbacks NOT on mongod restarts also. + int _rbid; // (M) + + // list of information about clients waiting on replication. Does *not* own the + // WaiterInfos. + std::vector<WaiterInfo*> _replicationWaiterList; // (M) + + // list of information about clients waiting for a particular opTime. + // Does *not* own the WaiterInfos. + std::vector<WaiterInfo*> _opTimeWaiterList; // (M) + + // Set to true when we are in the process of shutting down replication. + bool _inShutdown; // (M) + + // Election ID of the last election that resulted in this node becoming primary. + OID _electionId; // (M) + + // Vector containing known information about each member (such as replication + // progress and member ID) in our replica set or each member replicating from + // us in a master-slave deployment. In master/slave, the first entry is + // guaranteed to correspond to ourself. In replica sets where we don't have a + // valid config or are in state REMOVED then the vector will be a single element + // just with info about ourself. In replica sets with a valid config the elements + // will be in the same order as the members in the replica set config, thus + // the entry for ourself will be at _thisMemberConfigIndex. + SlaveInfoVector _slaveInfo; // (M) + + // Current ReplicaSet state. + MemberState _memberState; // (MX) + + // True if we are waiting for the applier to finish draining. + bool _isWaitingForDrainToComplete; // (M) + + // Used to signal threads waiting for changes to _rsConfigState. + stdx::condition_variable _rsConfigStateChange; // (M) + + // Represents the configuration state of the coordinator, which controls how and when + // _rsConfig may change. See the state transition diagram in the type definition of + // ConfigState for details. + ConfigState _rsConfigState; // (M) + + // The current ReplicaSet configuration object, including the information about tag groups + // that is used to satisfy write concern requests with named gle modes. + ReplicaSetConfig _rsConfig; // (MX) + + // This member's index position in the current config. + int _selfIndex; // (MX) + + // Vector of events that should be signaled whenever new heartbeat data comes in. + std::vector<ReplicationExecutor::EventHandle> _stepDownWaiters; // (X) + + // State for conducting an election of this node. + // the presence of a non-null _freshnessChecker pointer indicates that an election is + // currently in progress. When using the V1 protocol, a non-null _voteRequester pointer + // indicates this instead. + // Only one election is allowed at a time. + std::unique_ptr<FreshnessChecker> _freshnessChecker; // (X) - std::unique_ptr<ElectionWinnerDeclarer> _electionWinnerDeclarer; // (X) + std::unique_ptr<ElectCmdRunner> _electCmdRunner; // (X) - // Event that the election code will signal when the in-progress election completes. - // Unspecified value when _freshnessChecker is NULL. - ReplicationExecutor::EventHandle _electionFinishedEvent; // (X) + std::unique_ptr<VoteRequester> _voteRequester; // (X) - // Whether we slept last time we attempted an election but possibly tied with other nodes. - bool _sleptLastElection; // (X) + std::unique_ptr<ElectionWinnerDeclarer> _electionWinnerDeclarer; // (X) - // Flag that indicates whether writes to databases other than "local" are allowed. Used to - // answer canAcceptWritesForDatabase() and canAcceptWritesFor() questions. - // Always true for standalone nodes and masters in master-slave relationships. - bool _canAcceptNonLocalWrites; // (GX) + // Event that the election code will signal when the in-progress election completes. + // Unspecified value when _freshnessChecker is NULL. + ReplicationExecutor::EventHandle _electionFinishedEvent; // (X) - // Flag that indicates whether reads from databases other than "local" are allowed. Unlike - // _canAcceptNonLocalWrites, above, this question is about admission control on secondaries, - // and we do not require that its observers be strongly synchronized. Accidentally - // providing the prior value for a limited period of time is acceptable. Also unlike - // _canAcceptNonLocalWrites, its value is only meaningful on replica set secondaries. - AtomicUInt32 _canServeNonLocalReads; // (S) + // Whether we slept last time we attempted an election but possibly tied with other nodes. + bool _sleptLastElection; // (X) - // OpTime of the latest committed operation. Matches the concurrency level of _slaveInfo. - OpTime _lastCommittedOpTime; // (M) + // Flag that indicates whether writes to databases other than "local" are allowed. Used to + // answer canAcceptWritesForDatabase() and canAcceptWritesFor() questions. + // Always true for standalone nodes and masters in master-slave relationships. + bool _canAcceptNonLocalWrites; // (GX) - // Data Replicator used to replicate data - DataReplicator _dr; // (S) + // Flag that indicates whether reads from databases other than "local" are allowed. Unlike + // _canAcceptNonLocalWrites, above, this question is about admission control on secondaries, + // and we do not require that its observers be strongly synchronized. Accidentally + // providing the prior value for a limited period of time is acceptable. Also unlike + // _canAcceptNonLocalWrites, its value is only meaningful on replica set secondaries. + AtomicUInt32 _canServeNonLocalReads; // (S) - }; + // OpTime of the latest committed operation. Matches the concurrency level of _slaveInfo. + OpTime _lastCommittedOpTime; // (M) + + // Data Replicator used to replicate data + DataReplicator _dr; // (S) +}; -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |