summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.h')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h1772
1 files changed, 877 insertions, 895 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 8fcd9671dae..7183145abcd 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -53,991 +53,973 @@
namespace mongo {
- class Timer;
- template <typename T> class StatusWith;
+class Timer;
+template <typename T>
+class StatusWith;
namespace repl {
- class ElectCmdRunner;
- class ElectionWinnerDeclarer;
- class FreshnessChecker;
- class HandshakeArgs;
- class HeartbeatResponseAction;
- class LastVote;
- class OplogReader;
- class ReplSetDeclareElectionWinnerArgs;
- class ReplSetRequestVotesArgs;
- class ReplicaSetConfig;
- class SyncSourceFeedback;
- class TopologyCoordinator;
- class VoteRequester;
+class ElectCmdRunner;
+class ElectionWinnerDeclarer;
+class FreshnessChecker;
+class HandshakeArgs;
+class HeartbeatResponseAction;
+class LastVote;
+class OplogReader;
+class ReplSetDeclareElectionWinnerArgs;
+class ReplSetRequestVotesArgs;
+class ReplicaSetConfig;
+class SyncSourceFeedback;
+class TopologyCoordinator;
+class VoteRequester;
- class ReplicationCoordinatorImpl : public ReplicationCoordinator,
- public KillOpListenerInterface {
- MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl);
+class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpListenerInterface {
+ MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl);
- public:
+public:
+ // Takes ownership of the "externalState", "topCoord" and "network" objects.
+ ReplicationCoordinatorImpl(const ReplSettings& settings,
+ ReplicationCoordinatorExternalState* externalState,
+ executor::NetworkInterface* network,
+ StorageInterface* storage,
+ TopologyCoordinator* topoCoord,
+ int64_t prngSeed);
+ // Takes ownership of the "externalState" and "topCoord" objects.
+ ReplicationCoordinatorImpl(const ReplSettings& settings,
+ ReplicationCoordinatorExternalState* externalState,
+ TopologyCoordinator* topoCoord,
+ ReplicationExecutor* replExec,
+ int64_t prngSeed);
+ virtual ~ReplicationCoordinatorImpl();
- // Takes ownership of the "externalState", "topCoord" and "network" objects.
- ReplicationCoordinatorImpl(const ReplSettings& settings,
- ReplicationCoordinatorExternalState* externalState,
- executor::NetworkInterface* network,
- StorageInterface* storage,
- TopologyCoordinator* topoCoord,
- int64_t prngSeed);
- // Takes ownership of the "externalState" and "topCoord" objects.
- ReplicationCoordinatorImpl(const ReplSettings& settings,
- ReplicationCoordinatorExternalState* externalState,
- TopologyCoordinator* topoCoord,
- ReplicationExecutor* replExec,
- int64_t prngSeed);
- virtual ~ReplicationCoordinatorImpl();
+ // ================== Members of public ReplicationCoordinator API ===================
- // ================== Members of public ReplicationCoordinator API ===================
+ virtual void startReplication(OperationContext* txn) override;
- virtual void startReplication(OperationContext* txn) override;
+ virtual void shutdown() override;
- virtual void shutdown() override;
+ virtual const ReplSettings& getSettings() const override;
- virtual const ReplSettings& getSettings() const override;
+ virtual Mode getReplicationMode() const override;
- virtual Mode getReplicationMode() const override;
+ virtual MemberState getMemberState() const override;
- virtual MemberState getMemberState() const override;
+ virtual bool isInPrimaryOrSecondaryState() const override;
- virtual bool isInPrimaryOrSecondaryState() const override;
+ virtual Seconds getSlaveDelaySecs() const override;
- virtual Seconds getSlaveDelaySecs() const override;
+ virtual void clearSyncSourceBlacklist() override;
- virtual void clearSyncSourceBlacklist() override;
+ /*
+ * Implementation of the KillOpListenerInterface interrupt method so that we can wake up
+ * threads blocked in awaitReplication() when a killOp command comes in.
+ */
+ virtual void interrupt(unsigned opId);
- /*
- * Implementation of the KillOpListenerInterface interrupt method so that we can wake up
- * threads blocked in awaitReplication() when a killOp command comes in.
- */
- virtual void interrupt(unsigned opId);
+ /*
+ * Implementation of the KillOpListenerInterface interruptAll method so that we can wake up
+ * threads blocked in awaitReplication() when we kill all operations.
+ */
+ virtual void interruptAll();
- /*
- * Implementation of the KillOpListenerInterface interruptAll method so that we can wake up
- * threads blocked in awaitReplication() when we kill all operations.
- */
- virtual void interruptAll();
+ virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
+ OperationContext* txn, const OpTime& opTime, const WriteConcernOptions& writeConcern);
- virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
- OperationContext* txn,
- const OpTime& opTime,
- const WriteConcernOptions& writeConcern);
+ virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpForClient(
+ OperationContext* txn, const WriteConcernOptions& writeConcern);
- virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpForClient(
- OperationContext* txn,
- const WriteConcernOptions& writeConcern);
+ virtual Status stepDown(OperationContext* txn,
+ bool force,
+ const Milliseconds& waitTime,
+ const Milliseconds& stepdownTime);
- virtual Status stepDown(OperationContext* txn,
- bool force,
- const Milliseconds& waitTime,
- const Milliseconds& stepdownTime);
+ virtual bool isMasterForReportingPurposes();
- virtual bool isMasterForReportingPurposes();
+ virtual bool canAcceptWritesForDatabase(StringData dbName);
- virtual bool canAcceptWritesForDatabase(StringData dbName);
+ bool canAcceptWritesFor(const NamespaceString& ns) override;
- bool canAcceptWritesFor(const NamespaceString& ns) override;
+ virtual Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions& writeConcern) const;
- virtual Status checkIfWriteConcernCanBeSatisfied(
- const WriteConcernOptions& writeConcern) const;
+ virtual Status checkCanServeReadsFor(OperationContext* txn,
+ const NamespaceString& ns,
+ bool slaveOk);
- virtual Status checkCanServeReadsFor(OperationContext* txn,
- const NamespaceString& ns,
- bool slaveOk);
+ virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx);
- virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx);
+ virtual Status setLastOptimeForSlave(const OID& rid, const Timestamp& ts);
- virtual Status setLastOptimeForSlave(const OID& rid, const Timestamp& ts);
+ virtual void setMyLastOptime(const OpTime& opTime);
- virtual void setMyLastOptime(const OpTime& opTime);
+ virtual void resetMyLastOptime();
- virtual void resetMyLastOptime();
+ virtual void setMyHeartbeatMessage(const std::string& msg);
- virtual void setMyHeartbeatMessage(const std::string& msg);
+ virtual OpTime getMyLastOptime() const override;
- virtual OpTime getMyLastOptime() const override;
+ virtual ReadAfterOpTimeResponse waitUntilOpTime(OperationContext* txn,
+ const ReadAfterOpTimeArgs& settings) override;
- virtual ReadAfterOpTimeResponse waitUntilOpTime(
- OperationContext* txn,
- const ReadAfterOpTimeArgs& settings) override;
+ virtual OID getElectionId() override;
- virtual OID getElectionId() override;
+ virtual OID getMyRID() const override;
- virtual OID getMyRID() const override;
+ virtual int getMyId() const override;
- virtual int getMyId() const override;
+ virtual bool setFollowerMode(const MemberState& newState) override;
- virtual bool setFollowerMode(const MemberState& newState) override;
+ virtual bool isWaitingForApplierToDrain() override;
- virtual bool isWaitingForApplierToDrain() override;
+ virtual void signalDrainComplete(OperationContext* txn) override;
- virtual void signalDrainComplete(OperationContext* txn) override;
+ virtual void signalUpstreamUpdater() override;
- virtual void signalUpstreamUpdater() override;
+ virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override;
- virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) override;
+ virtual Status processReplSetGetStatus(BSONObjBuilder* result) override;
- virtual Status processReplSetGetStatus(BSONObjBuilder* result) override;
+ virtual void fillIsMasterForReplSet(IsMasterResponse* result) override;
- virtual void fillIsMasterForReplSet(IsMasterResponse* result) override;
+ virtual void appendSlaveInfoData(BSONObjBuilder* result) override;
- virtual void appendSlaveInfoData(BSONObjBuilder* result) override;
+ virtual ReplicaSetConfig getConfig() const override;
- virtual ReplicaSetConfig getConfig() const override;
+ virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
- virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
+ virtual Status setMaintenanceMode(bool activate) override;
- virtual Status setMaintenanceMode(bool activate) override;
+ virtual bool getMaintenanceMode() override;
- virtual bool getMaintenanceMode() override;
+ virtual Status processReplSetSyncFrom(const HostAndPort& target,
+ BSONObjBuilder* resultObj) override;
- virtual Status processReplSetSyncFrom(const HostAndPort& target,
- BSONObjBuilder* resultObj) override;
+ virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override;
- virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override;
+ virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args,
+ ReplSetHeartbeatResponse* response) override;
- virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args,
- ReplSetHeartbeatResponse* response) override;
+ virtual Status processReplSetReconfig(OperationContext* txn,
+ const ReplSetReconfigArgs& args,
+ BSONObjBuilder* resultObj) override;
- virtual Status processReplSetReconfig(OperationContext* txn,
- const ReplSetReconfigArgs& args,
- BSONObjBuilder* resultObj) override;
+ virtual Status processReplSetInitiate(OperationContext* txn,
+ const BSONObj& configObj,
+ BSONObjBuilder* resultObj) override;
- virtual Status processReplSetInitiate(OperationContext* txn,
- const BSONObj& configObj,
- BSONObjBuilder* resultObj) override;
+ virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override;
- virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override;
+ virtual void incrementRollbackID() override;
- virtual void incrementRollbackID() override;
+ virtual Status processReplSetFresh(const ReplSetFreshArgs& args,
+ BSONObjBuilder* resultObj) override;
- virtual Status processReplSetFresh(const ReplSetFreshArgs& args,
- BSONObjBuilder* resultObj) override;
+ virtual Status processReplSetElect(const ReplSetElectArgs& args,
+ BSONObjBuilder* response) override;
- virtual Status processReplSetElect(const ReplSetElectArgs& args,
- BSONObjBuilder* response) override;
+ virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates,
+ long long* configVersion) override;
- virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates,
- long long* configVersion) override;
+ virtual Status processHandshake(OperationContext* txn, const HandshakeArgs& handshake) override;
- virtual Status processHandshake(OperationContext* txn,
- const HandshakeArgs& handshake) override;
+ virtual bool buildsIndexes() override;
- virtual bool buildsIndexes() override;
+ virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op) override;
- virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op) override;
+ virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override;
- virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override;
+ virtual WriteConcernOptions getGetLastErrorDefault() override;
- virtual WriteConcernOptions getGetLastErrorDefault() override;
+ virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override;
- virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override;
+ virtual bool isReplEnabled() const override;
- virtual bool isReplEnabled() const override;
+ virtual HostAndPort chooseNewSyncSource() override;
- virtual HostAndPort chooseNewSyncSource() override;
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
- virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
+ virtual void resetLastOpTimeFromOplog(OperationContext* txn) override;
- virtual void resetLastOpTimeFromOplog(OperationContext* txn) override;
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override;
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override;
+ virtual OpTime getLastCommittedOpTime() const override;
- virtual OpTime getLastCommittedOpTime() const override;
+ virtual Status processReplSetRequestVotes(OperationContext* txn,
+ const ReplSetRequestVotesArgs& args,
+ ReplSetRequestVotesResponse* response) override;
- virtual Status processReplSetRequestVotes(OperationContext* txn,
- const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response) override;
+ virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args,
+ long long* responseTerm) override;
- virtual Status processReplSetDeclareElectionWinner(
- const ReplSetDeclareElectionWinnerArgs& args,
- long long* responseTerm) override;
+ virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder);
- virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder);
+ virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
+ ReplSetHeartbeatResponse* response) override;
- virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
- ReplSetHeartbeatResponse* response) override;
+ virtual bool isV1ElectionProtocol() override;
- virtual bool isV1ElectionProtocol() override;
+ virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override;
- virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override;
+ /**
+ * Get current term from topology coordinator
+ */
+ virtual long long getTerm() override;
- /**
- * Get current term from topology coordinator
- */
- virtual long long getTerm() override;
+ virtual bool updateTerm(long long term) override;
- virtual bool updateTerm(long long term) override;
+ // ================== Test support API ===================
- // ================== Test support API ===================
+ /**
+ * If called after startReplication(), blocks until all asynchronous
+ * activities associated with replication start-up complete.
+ */
+ void waitForStartUpComplete();
- /**
- * If called after startReplication(), blocks until all asynchronous
- * activities associated with replication start-up complete.
- */
- void waitForStartUpComplete();
+ /**
+ * Gets the replica set configuration in use by the node.
+ */
+ ReplicaSetConfig getReplicaSetConfig_forTest();
- /**
- * Gets the replica set configuration in use by the node.
- */
- ReplicaSetConfig getReplicaSetConfig_forTest();
+ /**
+ * Simple wrapper around _setLastOptime_inlock to make it easier to test.
+ */
+ Status setLastOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
- /**
- * Simple wrapper around _setLastOptime_inlock to make it easier to test.
- */
- Status setLastOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
-
- bool updateTerm_forTest(long long term);
-
- private:
- ReplicationCoordinatorImpl(const ReplSettings& settings,
- ReplicationCoordinatorExternalState* externalState,
- TopologyCoordinator* topCoord,
- int64_t prngSeed,
- executor::NetworkInterface* network,
- StorageInterface* storage,
- ReplicationExecutor* replExec);
- /**
- * Configuration states for a replica set node.
- *
- * Transition diagram:
- *
- * PreStart ------------------> ReplicationDisabled
- * |
- * |
- * v
- * StartingUp -------> Uninitialized <------> Initiating
- * \ ^ |
- * ------- | |
- * | | |
- * v v |
- * Reconfig <---> Steady <----> HBReconfig |
- * ^ /
- * | /
- * \ /
- * -----------------------
- */
- enum ConfigState {
- kConfigPreStart,
- kConfigStartingUp,
- kConfigReplicationDisabled,
- kConfigUninitialized,
- kConfigSteady,
- kConfigInitiating,
- kConfigReconfiguring,
- kConfigHBReconfiguring
- };
-
- /**
- * Type describing actions to take after a change to the MemberState _memberState.
- */
- enum PostMemberStateUpdateAction {
- kActionNone,
- kActionCloseAllConnections, // Also indicates that we should clear sharding state.
- kActionFollowerModeStateChange,
- kActionWinElection
- };
-
- // Struct that holds information about clients waiting for replication.
- struct WaiterInfo;
-
- // Struct that holds information about nodes in this replication group, mainly used for
- // tracking replication progress for write concern satisfaction.
- struct SlaveInfo {
- OpTime opTime; // Our last known OpTime that this slave has replicated to.
- HostAndPort hostAndPort; // Client address of the slave.
- int memberId; // Id of the node in the replica set config, or -1 if we're not a replSet.
- OID rid; // RID of the node.
- bool self; // Whether this SlaveInfo stores the information about ourself
- SlaveInfo() : memberId(-1), self(false) {}
- };
-
- typedef std::vector<SlaveInfo> SlaveInfoVector;
-
- typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles;
-
- /**
- * Looks up the SlaveInfo in _slaveInfo associated with the given RID and returns a pointer
- * to it, or returns NULL if there is no SlaveInfo with the given RID.
- */
- SlaveInfo* _findSlaveInfoByRID_inlock(const OID& rid);
-
- /**
- * Looks up the SlaveInfo in _slaveInfo associated with the given member ID and returns a
- * pointer to it, or returns NULL if there is no SlaveInfo with the given member ID.
- */
- SlaveInfo* _findSlaveInfoByMemberID_inlock(int memberID);
-
- /**
- * Adds the given SlaveInfo to _slaveInfo and wakes up any threads waiting for replication
- * that now have their write concern satisfied. Only valid to call in master/slave setups.
- */
- void _addSlaveInfo_inlock(const SlaveInfo& slaveInfo);
-
- /**
- * Updates the item in _slaveInfo pointed to by 'slaveInfo' with the given OpTime 'opTime'
- * and wakes up any threads waiting for replication that now have their write concern
- * satisfied.
- */
- void _updateSlaveInfoOptime_inlock(SlaveInfo* slaveInfo, const OpTime& opTime);
-
- /**
- * Returns the index into _slaveInfo where data corresponding to ourself is stored.
- * For more info on the rules about how we know where our entry is, see the comment for
- * _slaveInfo.
- */
- size_t _getMyIndexInSlaveInfo_inlock() const;
-
- /**
- * Helper method that removes entries from _slaveInfo if they correspond to a node
- * with a member ID that is not in the current replica set config. Will always leave an
- * entry for ourself at the beginning of _slaveInfo, even if we aren't present in the
- * config.
- */
- void _updateSlaveInfoFromConfig_inlock();
-
- /**
- * Helper to update our saved config, cancel any pending heartbeats, and kick off sending
- * new heartbeats based on the new config. Must *only* be called from within the
- * ReplicationExecutor context.
- *
- * Returns an action to be performed after unlocking _mutex, via
- * _performPostMemberStateUpdateAction.
- */
- PostMemberStateUpdateAction _setCurrentRSConfig_inlock(
- const ReplicaSetConfig& newConfig,
- int myIndex);
-
- /**
- * Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication.
- */
- void _wakeReadyWaiters_inlock();
-
- /**
- * Helper method for setting/unsetting maintenance mode. Scheduled by setMaintenanceMode()
- * to run in a global write lock in the replication executor thread.
- */
- void _setMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData,
- bool activate,
- Status* result);
-
- /**
- * Helper method for retrieving maintenance mode. Scheduled by getMaintenanceMode() to run
- * in the replication executor thread.
- */
- void _getMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData,
- bool* maintenanceMode);
-
- /**
- * Bottom half of fillIsMasterForReplSet.
- */
- void _fillIsMasterForReplSet_finish(const ReplicationExecutor::CallbackArgs& cbData,
- IsMasterResponse* result);
-
- /**
- * Bottom half of processReplSetFresh.
- */
- void _processReplSetFresh_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetFreshArgs& args,
- BSONObjBuilder* response,
- Status* result);
-
- /**
- * Bottom half of processReplSetElect.
- */
- void _processReplSetElect_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetElectArgs& args,
- BSONObjBuilder* response,
- Status* result);
-
- /**
- * Bottom half of processReplSetFreeze.
- */
- void _processReplSetFreeze_finish(const ReplicationExecutor::CallbackArgs& cbData,
- int secs,
- BSONObjBuilder* response,
- Status* result);
- /*
- * Bottom half of clearSyncSourceBlacklist
- */
- void _clearSyncSourceBlacklist_finish(const ReplicationExecutor::CallbackArgs& cbData);
-
- /**
- * Bottom half of processReplSetDeclareElectionWinner.
- */
- void _processReplSetDeclareElectionWinner_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetDeclareElectionWinnerArgs& args,
- long long* responseTerm,
- Status* result);
-
- /**
- * Bottom half of processReplSetRequestVotes.
- */
- void _processReplSetRequestVotes_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response,
- Status* result);
-
- /**
- * Scheduled to cause the ReplicationCoordinator to reconsider any state that might
- * need to change as a result of time passing - for instance becoming PRIMARY when a single
- * node replica set member's stepDown period ends.
- */
- void _handleTimePassing(const ReplicationExecutor::CallbackArgs& cbData);
-
- /**
- * Helper method for _awaitReplication that takes an already locked unique_lock and a
- * Timer for timing the operation which has been counting since before the lock was
- * acquired.
- */
- ReplicationCoordinator::StatusAndDuration _awaitReplication_inlock(
- const Timer* timer,
- stdx::unique_lock<stdx::mutex>* lock,
- OperationContext* txn,
- const OpTime& opTime,
- const WriteConcernOptions& writeConcern);
-
- /*
- * Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable.
- */
- bool _doneWaitingForReplication_inlock(const OpTime& opTime,
- const WriteConcernOptions& writeConcern);
-
- /**
- * Helper for _doneWaitingForReplication_inlock that takes an integer write concern.
- */
- bool _haveNumNodesReachedOpTime_inlock(const OpTime& opTime, int numNodes);
-
- /**
- * Helper for _doneWaitingForReplication_inlock that takes a tag pattern representing a
- * named write concern mode.
- */
- bool _haveTaggedNodesReachedOpTime_inlock(const OpTime& opTime,
- const ReplicaSetTagPattern& tagPattern);
-
- Status _checkIfWriteConcernCanBeSatisfied_inlock(
- const WriteConcernOptions& writeConcern) const;
-
- /**
- * Triggers all callbacks that are blocked waiting for new heartbeat data
- * to decide whether or not to finish a step down.
- * Should only be called from executor callbacks.
- */
- void _signalStepDownWaitersFromCallback(const ReplicationExecutor::CallbackArgs& cbData);
- void _signalStepDownWaiters();
-
- /**
- * Helper for stepDown run within a ReplicationExecutor callback. This method assumes
- * it is running within a global shared lock, and thus that no writes are going on at the
- * same time.
- */
- void _stepDownContinue(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicationExecutor::EventHandle finishedEvent,
- OperationContext* txn,
- Date_t waitUntil,
- Date_t stepdownUntil,
- bool force,
- Status* result);
-
- OID _getMyRID_inlock() const;
-
- int _getMyId_inlock() const;
-
- OpTime _getMyLastOptime_inlock() const;
-
- /**
- * Bottom half of setFollowerMode.
- *
- * May reschedule itself after the current election, so it is not sufficient to
- * wait for a callback scheduled to execute this method to complete. Instead,
- * supply an event, "finishedSettingFollowerMode", and wait for that event to
- * be signaled. Do not observe "*success" until after the event is signaled.
- */
- void _setFollowerModeFinish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const MemberState& newState,
- const ReplicationExecutor::EventHandle& finishedSettingFollowerMode,
- bool* success);
-
- /**
- * Helper method for updating our tracking of the last optime applied by a given node.
- * This is only valid to call on replica sets.
- * "configVersion" will be populated with our config version if it and the configVersion
- * of "args" differ.
- */
- Status _setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args,
- long long* configVersion);
-
- /**
- * Helper method for setMyLastOptime that takes in a unique lock on
- * _mutex. The passed in lock must already be locked. It is unspecified what state the
- * lock will be in after this method finishes.
- *
- * This function has the same rules for "opTime" as setMyLastOptime(), unless
- * "isRollbackAllowed" is true.
- */
- void _setMyLastOptime_inlock(stdx::unique_lock<stdx::mutex>* lock,
- const OpTime& opTime,
- bool isRollbackAllowed);
-
- /**
- * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index
- * into the replica set config members array that corresponds to the "target", or -1 if
- * "target" is not in _rsConfig.
- */
- void _scheduleHeartbeatToTarget(const HostAndPort& target, int targetIndex, Date_t when);
-
- /**
- * Processes each heartbeat response.
- *
- * Schedules additional heartbeats, triggers elections and step downs, etc.
- */
- void _handleHeartbeatResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
- int targetIndex);
-
- void _handleHeartbeatResponseV1(
- const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
- int targetIndex);
-
- void _trackHeartbeatHandle(const StatusWith<ReplicationExecutor::CallbackHandle>& handle);
-
- void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle);
-
- /**
- * Helper for _handleHeartbeatResponse.
- *
- * Updates the optime associated with the member at "memberIndex" in our config.
- */
- void _updateOpTimeFromHeartbeat_inlock(int memberIndex, const OpTime& optime);
-
- /**
- * Starts a heartbeat for each member in the current config. Called within the executor
- * context.
- */
- void _startHeartbeats();
-
- /**
- * Cancels all heartbeats. Called within executor context.
- */
- void _cancelHeartbeats();
-
- /**
- * Asynchronously sends a heartbeat to "target". "targetIndex" is the index
- * into the replica set config members array that corresponds to the "target", or -1 if
- * we don't have a valid replica set config.
- *
- * Scheduled by _scheduleHeartbeatToTarget.
- */
- void _doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,
- const HostAndPort& target,
- int targetIndex);
-
-
- MemberState _getMemberState_inlock() const;
-
- /**
- * Callback that gives the TopologyCoordinator an initial LastVote document from
- * local storage.
- *
- * Called only during replication startup. All other updates come from the
- * TopologyCoordinator itself.
- */
- void _updateLastVote(const LastVote& lastVote);
-
- /**
- * Starts loading the replication configuration from local storage, and if it is valid,
- * schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set
- * config (sets _rsConfig and _thisMembersConfigIndex).
- * Returns true if it finishes loading the local config, which most likely means there
- * was no local config at all or it was invalid in some way, and false if there was a valid
- * config detected but more work is needed to set it as the local config (which will be
- * handled by the callback to _finishLoadLocalConfig).
- */
- bool _startLoadLocalConfig(OperationContext* txn);
-
- /**
- * Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState
- * to kConfigSteady, so that we can begin processing heartbeats and reconfigs.
- */
- void _finishLoadLocalConfig(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& localConfig,
- const StatusWith<OpTime>& lastOpTimeStatus);
-
- /**
- * Callback that finishes the work of processReplSetInitiate() inside the replication
- * executor context, in the event of a successful quorum check.
- */
- void _finishReplSetInitiate(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex);
-
- /**
- * Callback that finishes the work of processReplSetReconfig inside the replication
- * executor context, in the event of a successful quorum check.
- */
- void _finishReplSetReconfig(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex);
-
- /**
- * Changes _rsConfigState to newState, and notify any waiters.
- */
- void _setConfigState_inlock(ConfigState newState);
-
- /**
- * Updates the cached value, _memberState, to match _topCoord's reported
- * member state, from getMemberState().
- *
- * Returns an enum indicating what action to take after releasing _mutex, if any.
- * Call performPostMemberStateUpdateAction on the return value after releasing
- * _mutex.
- */
- PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock();
-
- /**
- * Performs a post member-state update action. Do not call while holding _mutex.
- */
- void _performPostMemberStateUpdateAction(PostMemberStateUpdateAction action);
-
- /**
- * Begins an attempt to elect this node.
- * Called after an incoming heartbeat changes this node's view of the set such that it
- * believes it can be elected PRIMARY.
- * For proper concurrency, must be called via a ReplicationExecutor callback.
- *
- * For old style elections the election path is:
- * _startElectSelf()
- * _onFreshnessCheckComplete()
- * _onElectCmdRunnerComplete()
- * For V1 (raft) style elections the election path is:
- * _startElectSelfV1()
- * _onDryRunComplete()
- * _onVoteRequestComplete()
- * _onElectionWinnerDeclarerComplete()
- */
- void _startElectSelf();
- void _startElectSelfV1();
-
- /**
- * Callback called when the FreshnessChecker has completed; checks the results and
- * decides whether to continue election proceedings.
- **/
- void _onFreshnessCheckComplete();
-
- /**
- * Callback called when the ElectCmdRunner has completed; checks the results and
- * decides whether to complete the election and change state to primary.
- **/
- void _onElectCmdRunnerComplete();
-
- /**
- * Callback called when the dryRun VoteRequester has completed; checks the results and
- * decides whether to conduct a proper election.
- * "originalTerm" was the term during which the dry run began, if the term has since
- * changed, do not run for election.
- */
- void _onDryRunComplete(long long originalTerm);
-
- /**
- * Callback called when the VoteRequester has completed; checks the results and
- * decides whether to change state to primary and alert other nodes of our primary-ness.
- * "originalTerm" was the term during which the election began, if the term has since
- * changed, do not step up as primary.
- */
- void _onVoteRequestComplete(long long originalTerm);
-
- /**
- * Callback called when the ElectWinnerDeclarer has completed; checks the results and
- * if we received any negative responses, relinquish primary.
- */
- void _onElectionWinnerDeclarerComplete();
-
- /**
- * Callback called after a random delay, to prevent repeated election ties.
- */
- void _recoverFromElectionTie(const ReplicationExecutor::CallbackArgs& cbData);
-
- /**
- * Chooses a new sync source. Must be scheduled as a callback.
- *
- * Calls into the Topology Coordinator, which uses its current view of the set to choose
- * the most appropriate sync source.
- */
- void _chooseNewSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- HostAndPort* newSyncSource);
-
- /**
- * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot
- * be chosen as a sync source. Schedules a callback to unblacklist the sync source to be
- * run at 'until'.
- *
- * Must be scheduled as a callback.
- */
- void _blacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- const HostAndPort& host,
- Date_t until);
-
- /**
- * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply
- * ignored and no error is thrown.
- *
- * Must be scheduled as a callback.
- */
- void _unblacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- const HostAndPort& host);
-
- /**
- * Determines if a new sync source should be considered.
- *
- * Must be scheduled as a callback.
- */
- void _shouldChangeSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- const HostAndPort& currentSource,
- bool* shouldChange);
-
- /**
- * Schedules a request that the given host step down; logs any errors.
- */
- void _requestRemotePrimaryStepdown(const HostAndPort& target);
-
- void _heartbeatStepDownStart();
-
- /**
- * Completes a step-down of the current node. Must be run with a global
- * shared or global exclusive lock.
- */
- void _stepDownFinish(const ReplicationExecutor::CallbackArgs& cbData);
-
- /**
- * Schedules a replica set config change.
- */
- void _scheduleHeartbeatReconfig(const ReplicaSetConfig& newConfig);
-
- /**
- * Callback that continues a heartbeat-initiated reconfig after a running election
- * completes.
- */
- void _heartbeatReconfigAfterElectionCanceled(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig);
-
- /**
- * Method to write a configuration transmitted via heartbeat message to stable storage.
- */
- void _heartbeatReconfigStore(const ReplicationExecutor::CallbackArgs& cbd,
- const ReplicaSetConfig& newConfig);
-
- /**
- * Conclusion actions of a heartbeat-triggered reconfiguration.
- */
- void _heartbeatReconfigFinish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- StatusWith<int> myIndex);
-
- /**
- * Utility method that schedules or performs actions specified by a HeartbeatResponseAction
- * returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
- * value of "responseStatus".
- */
- void _handleHeartbeatResponseAction(
- const HeartbeatResponseAction& action,
- const StatusWith<ReplSetHeartbeatResponse>& responseStatus);
-
- /**
- * Bottom half of processHeartbeat(), which runs in the replication executor.
- */
- void _processHeartbeatFinish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgs& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus);
-
- /**
- * Bottom half of processHeartbeatV1(), which runs in the replication executor.
- */
- void _processHeartbeatFinishV1(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgsV1& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus);
- /**
- * Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of
- * servers; set _lastCommittedOpTime to this new entry, if greater than the current entry.
- */
- void _updateLastCommittedOpTime_inlock();
-
- void _summarizeAsHtml_finish(const ReplicationExecutor::CallbackArgs& cbData,
- ReplSetHtmlSummary* output);
-
- /**
- * Callback that gets the current term from topology coordinator.
- */
- void _getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, long long* term);
-
-
- /**
- * Callback that attempts to set the current term in topology coordinator and
- * relinquishes primary if the term actually changes and we are primary.
- */
- void _updateTerm_helper(const ReplicationExecutor::CallbackArgs& cbData,
- long long term,
- bool* updated,
- Handle* cbHandle);
- bool _updateTerm_incallback(long long term, Handle* cbHandle);
-
- //
- // All member variables are labeled with one of the following codes indicating the
- // synchronization rules for accessing them.
- //
- // (R) Read-only in concurrent operation; no synchronization required.
- // (S) Self-synchronizing; access in any way from any context.
- // (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing;
- // Access in any context.
- // (M) Reads and writes guarded by _mutex
- // (X) Reads and writes must be performed in a callback in _replExecutor
- // (MX) Must hold _mutex and be in a callback in _replExecutor to write; must either hold
- // _mutex or be in a callback in _replExecutor to read.
- // (GX) Readable under a global intent lock. Must either hold global lock in exclusive
- // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and be in executor
- // context to write.
- // (I) Independently synchronized, see member variable comment.
-
- // Protects member data of this ReplicationCoordinator.
- mutable stdx::mutex _mutex; // (S)
-
- // Handles to actively queued heartbeats.
- HeartbeatHandles _heartbeatHandles; // (X)
-
- // When this node does not know itself to be a member of a config, it adds
- // every host that sends it a heartbeat request to this set, and also starts
- // sending heartbeat requests to that host. This set is cleared whenever
- // a node discovers that it is a member of a config.
- unordered_set<HostAndPort> _seedList; // (X)
-
- // Parsed command line arguments related to replication.
- const ReplSettings _settings; // (R)
-
- // Mode of replication specified by _settings.
- const Mode _replMode; // (R)
-
- // Pointer to the TopologyCoordinator owned by this ReplicationCoordinator.
- std::unique_ptr<TopologyCoordinator> _topCoord; // (X)
-
- // If the executer is owned then this will be set, but should not be used.
- // This is only used to clean up and destroy the replExec if owned
- std::unique_ptr<ReplicationExecutor> _replExecutorIfOwned; // (S)
- // Executor that drives the topology coordinator.
- ReplicationExecutor& _replExecutor; // (S)
-
- // Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator.
- std::unique_ptr<ReplicationCoordinatorExternalState> _externalState; // (PS)
-
- // Thread that drives actions in the topology coordinator
- // Set in startReplication() and thereafter accessed in shutdown.
- std::unique_ptr<stdx::thread> _topCoordDriverThread; // (I)
-
- // Our RID, used to identify us to our sync source when sending replication progress
- // updates upstream. Set once in startReplication() and then never modified again.
- OID _myRID; // (M)
-
- // Rollback ID. Used to check if a rollback happened during some interval of time
- // TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
- int _rbid; // (M)
-
- // list of information about clients waiting on replication. Does *not* own the
- // WaiterInfos.
- std::vector<WaiterInfo*> _replicationWaiterList; // (M)
-
- // list of information about clients waiting for a particular opTime.
- // Does *not* own the WaiterInfos.
- std::vector<WaiterInfo*> _opTimeWaiterList; // (M)
-
- // Set to true when we are in the process of shutting down replication.
- bool _inShutdown; // (M)
-
- // Election ID of the last election that resulted in this node becoming primary.
- OID _electionId; // (M)
-
- // Vector containing known information about each member (such as replication
- // progress and member ID) in our replica set or each member replicating from
- // us in a master-slave deployment. In master/slave, the first entry is
- // guaranteed to correspond to ourself. In replica sets where we don't have a
- // valid config or are in state REMOVED then the vector will be a single element
- // just with info about ourself. In replica sets with a valid config the elements
- // will be in the same order as the members in the replica set config, thus
- // the entry for ourself will be at _thisMemberConfigIndex.
- SlaveInfoVector _slaveInfo; // (M)
-
- // Current ReplicaSet state.
- MemberState _memberState; // (MX)
-
- // True if we are waiting for the applier to finish draining.
- bool _isWaitingForDrainToComplete; // (M)
-
- // Used to signal threads waiting for changes to _rsConfigState.
- stdx::condition_variable _rsConfigStateChange; // (M)
-
- // Represents the configuration state of the coordinator, which controls how and when
- // _rsConfig may change. See the state transition diagram in the type definition of
- // ConfigState for details.
- ConfigState _rsConfigState; // (M)
-
- // The current ReplicaSet configuration object, including the information about tag groups
- // that is used to satisfy write concern requests with named gle modes.
- ReplicaSetConfig _rsConfig; // (MX)
-
- // This member's index position in the current config.
- int _selfIndex; // (MX)
-
- // Vector of events that should be signaled whenever new heartbeat data comes in.
- std::vector<ReplicationExecutor::EventHandle> _stepDownWaiters; // (X)
+ bool updateTerm_forTest(long long term);
- // State for conducting an election of this node.
- // the presence of a non-null _freshnessChecker pointer indicates that an election is
- // currently in progress. When using the V1 protocol, a non-null _voteRequester pointer
- // indicates this instead.
- // Only one election is allowed at a time.
- std::unique_ptr<FreshnessChecker> _freshnessChecker; // (X)
+private:
+ ReplicationCoordinatorImpl(const ReplSettings& settings,
+ ReplicationCoordinatorExternalState* externalState,
+ TopologyCoordinator* topCoord,
+ int64_t prngSeed,
+ executor::NetworkInterface* network,
+ StorageInterface* storage,
+ ReplicationExecutor* replExec);
+ /**
+ * Configuration states for a replica set node.
+ *
+ * Transition diagram:
+ *
+ * PreStart ------------------> ReplicationDisabled
+ * |
+ * |
+ * v
+ * StartingUp -------> Uninitialized <------> Initiating
+ * \ ^ |
+ * ------- | |
+ * | | |
+ * v v |
+ * Reconfig <---> Steady <----> HBReconfig |
+ * ^ /
+ * | /
+ * \ /
+ * -----------------------
+ */
+ enum ConfigState {
+ kConfigPreStart,
+ kConfigStartingUp,
+ kConfigReplicationDisabled,
+ kConfigUninitialized,
+ kConfigSteady,
+ kConfigInitiating,
+ kConfigReconfiguring,
+ kConfigHBReconfiguring
+ };
+
+ /**
+ * Type describing actions to take after a change to the MemberState _memberState.
+ */
+ enum PostMemberStateUpdateAction {
+ kActionNone,
+ kActionCloseAllConnections, // Also indicates that we should clear sharding state.
+ kActionFollowerModeStateChange,
+ kActionWinElection
+ };
- std::unique_ptr<ElectCmdRunner> _electCmdRunner; // (X)
+ // Struct that holds information about clients waiting for replication.
+ struct WaiterInfo;
+
+ // Struct that holds information about nodes in this replication group, mainly used for
+ // tracking replication progress for write concern satisfaction.
+ struct SlaveInfo {
+ OpTime opTime; // Our last known OpTime that this slave has replicated to.
+ HostAndPort hostAndPort; // Client address of the slave.
+ int memberId; // Id of the node in the replica set config, or -1 if we're not a replSet.
+ OID rid; // RID of the node.
+ bool self; // Whether this SlaveInfo stores the information about ourself
+ SlaveInfo() : memberId(-1), self(false) {}
+ };
- std::unique_ptr<VoteRequester> _voteRequester; // (X)
+ typedef std::vector<SlaveInfo> SlaveInfoVector;
+
+ typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles;
+
+ /**
+ * Looks up the SlaveInfo in _slaveInfo associated with the given RID and returns a pointer
+ * to it, or returns NULL if there is no SlaveInfo with the given RID.
+ */
+ SlaveInfo* _findSlaveInfoByRID_inlock(const OID& rid);
+
+ /**
+ * Looks up the SlaveInfo in _slaveInfo associated with the given member ID and returns a
+ * pointer to it, or returns NULL if there is no SlaveInfo with the given member ID.
+ */
+ SlaveInfo* _findSlaveInfoByMemberID_inlock(int memberID);
+
+ /**
+ * Adds the given SlaveInfo to _slaveInfo and wakes up any threads waiting for replication
+ * that now have their write concern satisfied. Only valid to call in master/slave setups.
+ */
+ void _addSlaveInfo_inlock(const SlaveInfo& slaveInfo);
+
+ /**
+ * Updates the item in _slaveInfo pointed to by 'slaveInfo' with the given OpTime 'opTime'
+ * and wakes up any threads waiting for replication that now have their write concern
+ * satisfied.
+ */
+ void _updateSlaveInfoOptime_inlock(SlaveInfo* slaveInfo, const OpTime& opTime);
+
+ /**
+ * Returns the index into _slaveInfo where data corresponding to ourself is stored.
+ * For more info on the rules about how we know where our entry is, see the comment for
+ * _slaveInfo.
+ */
+ size_t _getMyIndexInSlaveInfo_inlock() const;
+
+ /**
+ * Helper method that removes entries from _slaveInfo if they correspond to a node
+ * with a member ID that is not in the current replica set config. Will always leave an
+ * entry for ourself at the beginning of _slaveInfo, even if we aren't present in the
+ * config.
+ */
+ void _updateSlaveInfoFromConfig_inlock();
+
+ /**
+ * Helper to update our saved config, cancel any pending heartbeats, and kick off sending
+ * new heartbeats based on the new config. Must *only* be called from within the
+ * ReplicationExecutor context.
+ *
+ * Returns an action to be performed after unlocking _mutex, via
+ * _performPostMemberStateUpdateAction.
+ */
+ PostMemberStateUpdateAction _setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig,
+ int myIndex);
+
+ /**
+ * Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication.
+ */
+ void _wakeReadyWaiters_inlock();
+
+ /**
+ * Helper method for setting/unsetting maintenance mode. Scheduled by setMaintenanceMode()
+ * to run in a global write lock in the replication executor thread.
+ */
+ void _setMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData,
+ bool activate,
+ Status* result);
+
+ /**
+ * Helper method for retrieving maintenance mode. Scheduled by getMaintenanceMode() to run
+ * in the replication executor thread.
+ */
+ void _getMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData,
+ bool* maintenanceMode);
+
+ /**
+ * Bottom half of fillIsMasterForReplSet.
+ */
+ void _fillIsMasterForReplSet_finish(const ReplicationExecutor::CallbackArgs& cbData,
+ IsMasterResponse* result);
+
+ /**
+ * Bottom half of processReplSetFresh.
+ */
+ void _processReplSetFresh_finish(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplSetFreshArgs& args,
+ BSONObjBuilder* response,
+ Status* result);
+
+ /**
+ * Bottom half of processReplSetElect.
+ */
+ void _processReplSetElect_finish(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplSetElectArgs& args,
+ BSONObjBuilder* response,
+ Status* result);
+
+ /**
+ * Bottom half of processReplSetFreeze.
+ */
+ void _processReplSetFreeze_finish(const ReplicationExecutor::CallbackArgs& cbData,
+ int secs,
+ BSONObjBuilder* response,
+ Status* result);
+ /*
+ * Bottom half of clearSyncSourceBlacklist
+ */
+ void _clearSyncSourceBlacklist_finish(const ReplicationExecutor::CallbackArgs& cbData);
+
+ /**
+ * Bottom half of processReplSetDeclareElectionWinner.
+ */
+ void _processReplSetDeclareElectionWinner_finish(
+ const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplSetDeclareElectionWinnerArgs& args,
+ long long* responseTerm,
+ Status* result);
+
+ /**
+ * Bottom half of processReplSetRequestVotes.
+ */
+ void _processReplSetRequestVotes_finish(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplSetRequestVotesArgs& args,
+ ReplSetRequestVotesResponse* response,
+ Status* result);
+
+ /**
+ * Scheduled to cause the ReplicationCoordinator to reconsider any state that might
+ * need to change as a result of time passing - for instance becoming PRIMARY when a single
+ * node replica set member's stepDown period ends.
+ */
+ void _handleTimePassing(const ReplicationExecutor::CallbackArgs& cbData);
+
+ /**
+ * Helper method for _awaitReplication that takes an already locked unique_lock and a
+ * Timer for timing the operation which has been counting since before the lock was
+ * acquired.
+ */
+ ReplicationCoordinator::StatusAndDuration _awaitReplication_inlock(
+ const Timer* timer,
+ stdx::unique_lock<stdx::mutex>* lock,
+ OperationContext* txn,
+ const OpTime& opTime,
+ const WriteConcernOptions& writeConcern);
+
+ /*
+ * Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable.
+ */
+ bool _doneWaitingForReplication_inlock(const OpTime& opTime,
+ const WriteConcernOptions& writeConcern);
+
+ /**
+ * Helper for _doneWaitingForReplication_inlock that takes an integer write concern.
+ */
+ bool _haveNumNodesReachedOpTime_inlock(const OpTime& opTime, int numNodes);
+
+ /**
+ * Helper for _doneWaitingForReplication_inlock that takes a tag pattern representing a
+ * named write concern mode.
+ */
+ bool _haveTaggedNodesReachedOpTime_inlock(const OpTime& opTime,
+ const ReplicaSetTagPattern& tagPattern);
+
+ Status _checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions& writeConcern) const;
+
+ /**
+ * Triggers all callbacks that are blocked waiting for new heartbeat data
+ * to decide whether or not to finish a step down.
+ * Should only be called from executor callbacks.
+ */
+ void _signalStepDownWaitersFromCallback(const ReplicationExecutor::CallbackArgs& cbData);
+ void _signalStepDownWaiters();
+
+ /**
+ * Helper for stepDown run within a ReplicationExecutor callback. This method assumes
+ * it is running within a global shared lock, and thus that no writes are going on at the
+ * same time.
+ */
+ void _stepDownContinue(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplicationExecutor::EventHandle finishedEvent,
+ OperationContext* txn,
+ Date_t waitUntil,
+ Date_t stepdownUntil,
+ bool force,
+ Status* result);
+
+ OID _getMyRID_inlock() const;
+
+ int _getMyId_inlock() const;
+
+ OpTime _getMyLastOptime_inlock() const;
+
+ /**
+ * Bottom half of setFollowerMode.
+ *
+ * May reschedule itself after the current election, so it is not sufficient to
+ * wait for a callback scheduled to execute this method to complete. Instead,
+ * supply an event, "finishedSettingFollowerMode", and wait for that event to
+ * be signaled. Do not observe "*success" until after the event is signaled.
+ */
+ void _setFollowerModeFinish(const ReplicationExecutor::CallbackArgs& cbData,
+ const MemberState& newState,
+ const ReplicationExecutor::EventHandle& finishedSettingFollowerMode,
+ bool* success);
+
+ /**
+ * Helper method for updating our tracking of the last optime applied by a given node.
+ * This is only valid to call on replica sets.
+ * "configVersion" will be populated with our config version if it and the configVersion
+ * of "args" differ.
+ */
+ Status _setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args,
+ long long* configVersion);
+
+ /**
+ * Helper method for setMyLastOptime that takes in a unique lock on
+ * _mutex. The passed in lock must already be locked. It is unspecified what state the
+ * lock will be in after this method finishes.
+ *
+ * This function has the same rules for "opTime" as setMyLastOptime(), unless
+ * "isRollbackAllowed" is true.
+ */
+ void _setMyLastOptime_inlock(stdx::unique_lock<stdx::mutex>* lock,
+ const OpTime& opTime,
+ bool isRollbackAllowed);
+
+ /**
+ * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index
+ * into the replica set config members array that corresponds to the "target", or -1 if
+ * "target" is not in _rsConfig.
+ */
+ void _scheduleHeartbeatToTarget(const HostAndPort& target, int targetIndex, Date_t when);
+
+ /**
+ * Processes each heartbeat response.
+ *
+ * Schedules additional heartbeats, triggers elections and step downs, etc.
+ */
+ void _handleHeartbeatResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
+ int targetIndex);
+
+ void _handleHeartbeatResponseV1(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
+ int targetIndex);
+
+ void _trackHeartbeatHandle(const StatusWith<ReplicationExecutor::CallbackHandle>& handle);
+
+ void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle);
+
+ /**
+ * Helper for _handleHeartbeatResponse.
+ *
+ * Updates the optime associated with the member at "memberIndex" in our config.
+ */
+ void _updateOpTimeFromHeartbeat_inlock(int memberIndex, const OpTime& optime);
+
+ /**
+ * Starts a heartbeat for each member in the current config. Called within the executor
+ * context.
+ */
+ void _startHeartbeats();
+
+ /**
+ * Cancels all heartbeats. Called within executor context.
+ */
+ void _cancelHeartbeats();
+
+ /**
+ * Asynchronously sends a heartbeat to "target". "targetIndex" is the index
+ * into the replica set config members array that corresponds to the "target", or -1 if
+ * we don't have a valid replica set config.
+ *
+ * Scheduled by _scheduleHeartbeatToTarget.
+ */
+ void _doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,
+ const HostAndPort& target,
+ int targetIndex);
+
+
+ MemberState _getMemberState_inlock() const;
+
+ /**
+ * Callback that gives the TopologyCoordinator an initial LastVote document from
+ * local storage.
+ *
+ * Called only during replication startup. All other updates come from the
+ * TopologyCoordinator itself.
+ */
+ void _updateLastVote(const LastVote& lastVote);
+
+ /**
+ * Starts loading the replication configuration from local storage, and if it is valid,
+ * schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set
+ * config (sets _rsConfig and _thisMembersConfigIndex).
+ * Returns true if it finishes loading the local config, which most likely means there
+ * was no local config at all or it was invalid in some way, and false if there was a valid
+ * config detected but more work is needed to set it as the local config (which will be
+ * handled by the callback to _finishLoadLocalConfig).
+ */
+ bool _startLoadLocalConfig(OperationContext* txn);
+
+ /**
+ * Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState
+ * to kConfigSteady, so that we can begin processing heartbeats and reconfigs.
+ */
+ void _finishLoadLocalConfig(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplicaSetConfig& localConfig,
+ const StatusWith<OpTime>& lastOpTimeStatus);
+
+ /**
+ * Callback that finishes the work of processReplSetInitiate() inside the replication
+ * executor context, in the event of a successful quorum check.
+ */
+ void _finishReplSetInitiate(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplicaSetConfig& newConfig,
+ int myIndex);
+
+ /**
+ * Callback that finishes the work of processReplSetReconfig inside the replication
+ * executor context, in the event of a successful quorum check.
+ */
+ void _finishReplSetReconfig(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplicaSetConfig& newConfig,
+ int myIndex);
+
+ /**
+ * Changes _rsConfigState to newState, and notify any waiters.
+ */
+ void _setConfigState_inlock(ConfigState newState);
+
+ /**
+ * Updates the cached value, _memberState, to match _topCoord's reported
+ * member state, from getMemberState().
+ *
+ * Returns an enum indicating what action to take after releasing _mutex, if any.
+ * Call performPostMemberStateUpdateAction on the return value after releasing
+ * _mutex.
+ */
+ PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock();
+
+ /**
+ * Performs a post member-state update action. Do not call while holding _mutex.
+ */
+ void _performPostMemberStateUpdateAction(PostMemberStateUpdateAction action);
+
+ /**
+ * Begins an attempt to elect this node.
+ * Called after an incoming heartbeat changes this node's view of the set such that it
+ * believes it can be elected PRIMARY.
+ * For proper concurrency, must be called via a ReplicationExecutor callback.
+ *
+ * For old style elections the election path is:
+ * _startElectSelf()
+ * _onFreshnessCheckComplete()
+ * _onElectCmdRunnerComplete()
+ * For V1 (raft) style elections the election path is:
+ * _startElectSelfV1()
+ * _onDryRunComplete()
+ * _onVoteRequestComplete()
+ * _onElectionWinnerDeclarerComplete()
+ */
+ void _startElectSelf();
+ void _startElectSelfV1();
+
+ /**
+ * Callback called when the FreshnessChecker has completed; checks the results and
+ * decides whether to continue election proceedings.
+ **/
+ void _onFreshnessCheckComplete();
+
+ /**
+ * Callback called when the ElectCmdRunner has completed; checks the results and
+ * decides whether to complete the election and change state to primary.
+ **/
+ void _onElectCmdRunnerComplete();
+
+ /**
+ * Callback called when the dryRun VoteRequester has completed; checks the results and
+ * decides whether to conduct a proper election.
+ * "originalTerm" was the term during which the dry run began, if the term has since
+ * changed, do not run for election.
+ */
+ void _onDryRunComplete(long long originalTerm);
+
+ /**
+ * Callback called when the VoteRequester has completed; checks the results and
+ * decides whether to change state to primary and alert other nodes of our primary-ness.
+ * "originalTerm" was the term during which the election began, if the term has since
+ * changed, do not step up as primary.
+ */
+ void _onVoteRequestComplete(long long originalTerm);
+
+ /**
+ * Callback called when the ElectWinnerDeclarer has completed; checks the results and
+ * if we received any negative responses, relinquish primary.
+ */
+ void _onElectionWinnerDeclarerComplete();
+
+ /**
+ * Callback called after a random delay, to prevent repeated election ties.
+ */
+ void _recoverFromElectionTie(const ReplicationExecutor::CallbackArgs& cbData);
+
+ /**
+ * Chooses a new sync source. Must be scheduled as a callback.
+ *
+ * Calls into the Topology Coordinator, which uses its current view of the set to choose
+ * the most appropriate sync source.
+ */
+ void _chooseNewSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
+ HostAndPort* newSyncSource);
+
+ /**
+ * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot
+ * be chosen as a sync source. Schedules a callback to unblacklist the sync source to be
+ * run at 'until'.
+ *
+ * Must be scheduled as a callback.
+ */
+ void _blacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
+ const HostAndPort& host,
+ Date_t until);
+
+ /**
+ * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply
+ * ignored and no error is thrown.
+ *
+ * Must be scheduled as a callback.
+ */
+ void _unblacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
+ const HostAndPort& host);
+
+ /**
+ * Determines if a new sync source should be considered.
+ *
+ * Must be scheduled as a callback.
+ */
+ void _shouldChangeSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
+ const HostAndPort& currentSource,
+ bool* shouldChange);
+
+ /**
+ * Schedules a request that the given host step down; logs any errors.
+ */
+ void _requestRemotePrimaryStepdown(const HostAndPort& target);
+
+ void _heartbeatStepDownStart();
+
+ /**
+ * Completes a step-down of the current node. Must be run with a global
+ * shared or global exclusive lock.
+ */
+ void _stepDownFinish(const ReplicationExecutor::CallbackArgs& cbData);
+
+ /**
+ * Schedules a replica set config change.
+ */
+ void _scheduleHeartbeatReconfig(const ReplicaSetConfig& newConfig);
+
+ /**
+ * Callback that continues a heartbeat-initiated reconfig after a running election
+ * completes.
+ */
+ void _heartbeatReconfigAfterElectionCanceled(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplicaSetConfig& newConfig);
+
+ /**
+ * Method to write a configuration transmitted via heartbeat message to stable storage.
+ */
+ void _heartbeatReconfigStore(const ReplicationExecutor::CallbackArgs& cbd,
+ const ReplicaSetConfig& newConfig);
+
+ /**
+ * Conclusion actions of a heartbeat-triggered reconfiguration.
+ */
+ void _heartbeatReconfigFinish(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplicaSetConfig& newConfig,
+ StatusWith<int> myIndex);
+
+ /**
+ * Utility method that schedules or performs actions specified by a HeartbeatResponseAction
+ * returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
+ * value of "responseStatus".
+ */
+ void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action,
+ const StatusWith<ReplSetHeartbeatResponse>& responseStatus);
+
+ /**
+ * Bottom half of processHeartbeat(), which runs in the replication executor.
+ */
+ void _processHeartbeatFinish(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplSetHeartbeatArgs& args,
+ ReplSetHeartbeatResponse* response,
+ Status* outStatus);
+
+ /**
+ * Bottom half of processHeartbeatV1(), which runs in the replication executor.
+ */
+ void _processHeartbeatFinishV1(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplSetHeartbeatArgsV1& args,
+ ReplSetHeartbeatResponse* response,
+ Status* outStatus);
+ /**
+ * Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of
+ * servers; set _lastCommittedOpTime to this new entry, if greater than the current entry.
+ */
+ void _updateLastCommittedOpTime_inlock();
+
+ void _summarizeAsHtml_finish(const ReplicationExecutor::CallbackArgs& cbData,
+ ReplSetHtmlSummary* output);
+
+ /**
+ * Callback that gets the current term from topology coordinator.
+ */
+ void _getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, long long* term);
+
+
+ /**
+ * Callback that attempts to set the current term in topology coordinator and
+ * relinquishes primary if the term actually changes and we are primary.
+ */
+ void _updateTerm_helper(const ReplicationExecutor::CallbackArgs& cbData,
+ long long term,
+ bool* updated,
+ Handle* cbHandle);
+ bool _updateTerm_incallback(long long term, Handle* cbHandle);
+
+ //
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (R) Read-only in concurrent operation; no synchronization required.
+ // (S) Self-synchronizing; access in any way from any context.
+ // (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing;
+ // Access in any context.
+ // (M) Reads and writes guarded by _mutex
+ // (X) Reads and writes must be performed in a callback in _replExecutor
+ // (MX) Must hold _mutex and be in a callback in _replExecutor to write; must either hold
+ // _mutex or be in a callback in _replExecutor to read.
+ // (GX) Readable under a global intent lock. Must either hold global lock in exclusive
+ // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and be in executor
+ // context to write.
+ // (I) Independently synchronized, see member variable comment.
+
+ // Protects member data of this ReplicationCoordinator.
+ mutable stdx::mutex _mutex; // (S)
+
+ // Handles to actively queued heartbeats.
+ HeartbeatHandles _heartbeatHandles; // (X)
+
+ // When this node does not know itself to be a member of a config, it adds
+ // every host that sends it a heartbeat request to this set, and also starts
+ // sending heartbeat requests to that host. This set is cleared whenever
+ // a node discovers that it is a member of a config.
+ unordered_set<HostAndPort> _seedList; // (X)
+
+ // Parsed command line arguments related to replication.
+ const ReplSettings _settings; // (R)
+
+ // Mode of replication specified by _settings.
+ const Mode _replMode; // (R)
+
+ // Pointer to the TopologyCoordinator owned by this ReplicationCoordinator.
+ std::unique_ptr<TopologyCoordinator> _topCoord; // (X)
+
+ // If the executer is owned then this will be set, but should not be used.
+ // This is only used to clean up and destroy the replExec if owned
+ std::unique_ptr<ReplicationExecutor> _replExecutorIfOwned; // (S)
+ // Executor that drives the topology coordinator.
+ ReplicationExecutor& _replExecutor; // (S)
+
+ // Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator.
+ std::unique_ptr<ReplicationCoordinatorExternalState> _externalState; // (PS)
+
+ // Thread that drives actions in the topology coordinator
+ // Set in startReplication() and thereafter accessed in shutdown.
+ std::unique_ptr<stdx::thread> _topCoordDriverThread; // (I)
+
+ // Our RID, used to identify us to our sync source when sending replication progress
+ // updates upstream. Set once in startReplication() and then never modified again.
+ OID _myRID; // (M)
+
+ // Rollback ID. Used to check if a rollback happened during some interval of time
+ // TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
+ int _rbid; // (M)
+
+ // list of information about clients waiting on replication. Does *not* own the
+ // WaiterInfos.
+ std::vector<WaiterInfo*> _replicationWaiterList; // (M)
+
+ // list of information about clients waiting for a particular opTime.
+ // Does *not* own the WaiterInfos.
+ std::vector<WaiterInfo*> _opTimeWaiterList; // (M)
+
+ // Set to true when we are in the process of shutting down replication.
+ bool _inShutdown; // (M)
+
+ // Election ID of the last election that resulted in this node becoming primary.
+ OID _electionId; // (M)
+
+ // Vector containing known information about each member (such as replication
+ // progress and member ID) in our replica set or each member replicating from
+ // us in a master-slave deployment. In master/slave, the first entry is
+ // guaranteed to correspond to ourself. In replica sets where we don't have a
+ // valid config or are in state REMOVED then the vector will be a single element
+ // just with info about ourself. In replica sets with a valid config the elements
+ // will be in the same order as the members in the replica set config, thus
+ // the entry for ourself will be at _thisMemberConfigIndex.
+ SlaveInfoVector _slaveInfo; // (M)
+
+ // Current ReplicaSet state.
+ MemberState _memberState; // (MX)
+
+ // True if we are waiting for the applier to finish draining.
+ bool _isWaitingForDrainToComplete; // (M)
+
+ // Used to signal threads waiting for changes to _rsConfigState.
+ stdx::condition_variable _rsConfigStateChange; // (M)
+
+ // Represents the configuration state of the coordinator, which controls how and when
+ // _rsConfig may change. See the state transition diagram in the type definition of
+ // ConfigState for details.
+ ConfigState _rsConfigState; // (M)
+
+ // The current ReplicaSet configuration object, including the information about tag groups
+ // that is used to satisfy write concern requests with named gle modes.
+ ReplicaSetConfig _rsConfig; // (MX)
+
+ // This member's index position in the current config.
+ int _selfIndex; // (MX)
+
+ // Vector of events that should be signaled whenever new heartbeat data comes in.
+ std::vector<ReplicationExecutor::EventHandle> _stepDownWaiters; // (X)
+
+ // State for conducting an election of this node.
+ // the presence of a non-null _freshnessChecker pointer indicates that an election is
+ // currently in progress. When using the V1 protocol, a non-null _voteRequester pointer
+ // indicates this instead.
+ // Only one election is allowed at a time.
+ std::unique_ptr<FreshnessChecker> _freshnessChecker; // (X)
- std::unique_ptr<ElectionWinnerDeclarer> _electionWinnerDeclarer; // (X)
+ std::unique_ptr<ElectCmdRunner> _electCmdRunner; // (X)
- // Event that the election code will signal when the in-progress election completes.
- // Unspecified value when _freshnessChecker is NULL.
- ReplicationExecutor::EventHandle _electionFinishedEvent; // (X)
+ std::unique_ptr<VoteRequester> _voteRequester; // (X)
- // Whether we slept last time we attempted an election but possibly tied with other nodes.
- bool _sleptLastElection; // (X)
+ std::unique_ptr<ElectionWinnerDeclarer> _electionWinnerDeclarer; // (X)
- // Flag that indicates whether writes to databases other than "local" are allowed. Used to
- // answer canAcceptWritesForDatabase() and canAcceptWritesFor() questions.
- // Always true for standalone nodes and masters in master-slave relationships.
- bool _canAcceptNonLocalWrites; // (GX)
+ // Event that the election code will signal when the in-progress election completes.
+ // Unspecified value when _freshnessChecker is NULL.
+ ReplicationExecutor::EventHandle _electionFinishedEvent; // (X)
- // Flag that indicates whether reads from databases other than "local" are allowed. Unlike
- // _canAcceptNonLocalWrites, above, this question is about admission control on secondaries,
- // and we do not require that its observers be strongly synchronized. Accidentally
- // providing the prior value for a limited period of time is acceptable. Also unlike
- // _canAcceptNonLocalWrites, its value is only meaningful on replica set secondaries.
- AtomicUInt32 _canServeNonLocalReads; // (S)
+ // Whether we slept last time we attempted an election but possibly tied with other nodes.
+ bool _sleptLastElection; // (X)
- // OpTime of the latest committed operation. Matches the concurrency level of _slaveInfo.
- OpTime _lastCommittedOpTime; // (M)
+ // Flag that indicates whether writes to databases other than "local" are allowed. Used to
+ // answer canAcceptWritesForDatabase() and canAcceptWritesFor() questions.
+ // Always true for standalone nodes and masters in master-slave relationships.
+ bool _canAcceptNonLocalWrites; // (GX)
- // Data Replicator used to replicate data
- DataReplicator _dr; // (S)
+ // Flag that indicates whether reads from databases other than "local" are allowed. Unlike
+ // _canAcceptNonLocalWrites, above, this question is about admission control on secondaries,
+ // and we do not require that its observers be strongly synchronized. Accidentally
+ // providing the prior value for a limited period of time is acceptable. Also unlike
+ // _canAcceptNonLocalWrites, its value is only meaningful on replica set secondaries.
+ AtomicUInt32 _canServeNonLocalReads; // (S)
- };
+ // OpTime of the latest committed operation. Matches the concurrency level of _slaveInfo.
+ OpTime _lastCommittedOpTime; // (M)
+
+ // Data Replicator used to replicate data
+ DataReplicator _dr; // (S)
+};
-} // namespace repl
-} // namespace mongo
+} // namespace repl
+} // namespace mongo