/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include #include #include #include "mongo/base/status.h" #include "mongo/bson/timestamp.h" #include "mongo/db/service_context.h" #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/data_replicator.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/unordered_map.h" #include "mongo/platform/unordered_set.h" #include "mongo/util/net/hostandport.h" namespace mongo { class Timer; template class StatusWith; namespace repl { class ElectCmdRunner; class FreshnessChecker; class HandshakeArgs; class HeartbeatResponseAction; class OplogReader; class ReplSetDeclareElectionWinnerArgs; class ReplSetRequestVotesArgs; class ReplicaSetConfig; class SyncSourceFeedback; class TopologyCoordinator; class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpListenerInterface { MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl); public: // Takes ownership of the "externalState", "topCoord" and "network" objects. ReplicationCoordinatorImpl(const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, ReplicationExecutor::NetworkInterface* network, TopologyCoordinator* topoCoord, int64_t prngSeed); // Takes ownership of the "externalState" and "topCoord" objects. ReplicationCoordinatorImpl(const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, TopologyCoordinator* topoCoord, ReplicationExecutor* replExec, int64_t prngSeed); virtual ~ReplicationCoordinatorImpl(); // ================== Members of public ReplicationCoordinator API =================== virtual void startReplication(OperationContext* txn); virtual void shutdown(); virtual const ReplSettings& getSettings() const; virtual Mode getReplicationMode() const; virtual MemberState getMemberState() const; virtual bool isInPrimaryOrSecondaryState() const; virtual Seconds getSlaveDelaySecs() const; virtual void clearSyncSourceBlacklist(); /* * Implementation of the KillOpListenerInterface interrupt method so that we can wake up * threads blocked in awaitReplication() when a killOp command comes in. */ virtual void interrupt(unsigned opId); /* * Implementation of the KillOpListenerInterface interruptAll method so that we can wake up * threads blocked in awaitReplication() when we kill all operations. */ virtual void interruptAll(); virtual ReplicationCoordinator::StatusAndDuration awaitReplication( const OperationContext* txn, const Timestamp& ts, const WriteConcernOptions& writeConcern); virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpForClient( const OperationContext* txn, const WriteConcernOptions& writeConcern); virtual Status stepDown(OperationContext* txn, bool force, const Milliseconds& waitTime, const Milliseconds& stepdownTime); virtual bool isMasterForReportingPurposes(); virtual bool canAcceptWritesForDatabase(StringData dbName); virtual Status checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions& writeConcern) const; virtual Status checkCanServeReadsFor(OperationContext* txn, const NamespaceString& ns, bool slaveOk); virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx); virtual Status setLastOptimeForSlave(const OID& rid, const Timestamp& ts); virtual void setMyLastOptime(const Timestamp& ts); virtual void resetMyLastOptime(); virtual void setMyHeartbeatMessage(const std::string& msg); virtual Timestamp getMyLastOptime() const; virtual OID getElectionId(); virtual OID getMyRID() const; virtual int getMyId() const; virtual bool setFollowerMode(const MemberState& newState); virtual bool isWaitingForApplierToDrain(); virtual void signalDrainComplete(OperationContext* txn); virtual void signalUpstreamUpdater(); virtual bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); virtual Status processReplSetGetStatus(BSONObjBuilder* result); virtual void fillIsMasterForReplSet(IsMasterResponse* result); virtual void appendSlaveInfoData(BSONObjBuilder* result); virtual ReplicaSetConfig getConfig() const; virtual void processReplSetGetConfig(BSONObjBuilder* result); virtual Status setMaintenanceMode(bool activate); virtual bool getMaintenanceMode(); virtual Status processReplSetSyncFrom(const HostAndPort& target, BSONObjBuilder* resultObj); virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj); virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args, ReplSetHeartbeatResponse* response); virtual Status processReplSetReconfig(OperationContext* txn, const ReplSetReconfigArgs& args, BSONObjBuilder* resultObj); virtual Status processReplSetInitiate(OperationContext* txn, const BSONObj& configObj, BSONObjBuilder* resultObj); virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj); virtual void incrementRollbackID(); virtual Status processReplSetFresh(const ReplSetFreshArgs& args, BSONObjBuilder* resultObj); virtual Status processReplSetElect(const ReplSetElectArgs& args, BSONObjBuilder* response); virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates, long long* configVersion); virtual Status processHandshake(OperationContext* txn, const HandshakeArgs& handshake); virtual bool buildsIndexes(); virtual std::vector getHostsWrittenTo(const Timestamp& op); virtual std::vector getOtherNodesInReplSet() const; virtual WriteConcernOptions getGetLastErrorDefault(); virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); virtual bool isReplEnabled() const; virtual HostAndPort chooseNewSyncSource(); virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); virtual void resetLastOpTimeFromOplog(OperationContext* txn); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource); virtual Timestamp getLastCommittedOpTime() const; virtual Status processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response); virtual Status processReplSetDeclareElectionWinner( const ReplSetDeclareElectionWinnerArgs& args, ReplSetDeclareElectionWinnerResponse* response); // ================== Test support API =================== /** * If called after startReplication(), blocks until all asynchronous * activities associated with replication start-up complete. */ void waitForStartUpComplete(); /** * Gets the replica set configuration in use by the node. */ ReplicaSetConfig getReplicaSetConfig_forTest(); /** * Simple wrapper around _setLastOptime_inlock to make it easier to test. */ Status setLastOptime_forTest(long long cfgVer, long long memberId, const Timestamp& ts); private: ReplicationCoordinatorImpl(const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, TopologyCoordinator* topCoord, int64_t prngSeed, ReplicationExecutor::NetworkInterface* network, ReplicationExecutor* replExec); /** * Configuration states for a replica set node. * * Transition diagram: * * PreStart ------------------> ReplicationDisabled * | * | * v * StartingUp -------> Uninitialized <------> Initiating * \ ^ | * ------- | | * | | | * v v | * Reconfig <---> Steady <----> HBReconfig | * ^ / * | / * \ / * ----------------------- */ enum ConfigState { kConfigPreStart, kConfigStartingUp, kConfigReplicationDisabled, kConfigUninitialized, kConfigSteady, kConfigInitiating, kConfigReconfiguring, kConfigHBReconfiguring }; /** * Type describing actions to take after a change to the MemberState _memberState. */ enum PostMemberStateUpdateAction { kActionNone, kActionCloseAllConnections, // Also indicates that we should clear sharding state. kActionFollowerModeStateChange, kActionWinElection }; // Struct that holds information about clients waiting for replication. struct WaiterInfo; // Struct that holds information about nodes in this replication group, mainly used for // tracking replication progress for write concern satisfaction. struct SlaveInfo { Timestamp opTime; // Our last known OpTime that this slave has replicated to. HostAndPort hostAndPort; // Client address of the slave. int memberId; // Id of the node in the replica set config, or -1 if we're not a replSet. OID rid; // RID of the node. bool self; // Whether this SlaveInfo stores the information about ourself SlaveInfo() : memberId(-1), self(false) {} }; typedef std::vector SlaveInfoVector; typedef std::vector HeartbeatHandles; /** * Looks up the SlaveInfo in _slaveInfo associated with the given RID and returns a pointer * to it, or returns NULL if there is no SlaveInfo with the given RID. */ SlaveInfo* _findSlaveInfoByRID_inlock(const OID& rid); /** * Looks up the SlaveInfo in _slaveInfo associated with the given member ID and returns a * pointer to it, or returns NULL if there is no SlaveInfo with the given member ID. */ SlaveInfo* _findSlaveInfoByMemberID_inlock(int memberID); /** * Adds the given SlaveInfo to _slaveInfo and wakes up any threads waiting for replication * that now have their write concern satisfied. Only valid to call in master/slave setups. */ void _addSlaveInfo_inlock(const SlaveInfo& slaveInfo); /** * Updates the item in _slaveInfo pointed to by 'slaveInfo' with the given OpTime 'ts' * and wakes up any threads waiting for replication that now have their write concern * satisfied. */ void _updateSlaveInfoOptime_inlock(SlaveInfo* slaveInfo, Timestamp ts); /** * Returns the index into _slaveInfo where data corresponding to ourself is stored. * For more info on the rules about how we know where our entry is, see the comment for * _slaveInfo. */ size_t _getMyIndexInSlaveInfo_inlock() const; /** * Helper method that removes entries from _slaveInfo if they correspond to a node * with a member ID that is not in the current replica set config. Will always leave an * entry for ourself at the beginning of _slaveInfo, even if we aren't present in the * config. */ void _updateSlaveInfoFromConfig_inlock(); /** * Helper to update our saved config, cancel any pending heartbeats, and kick off sending * new heartbeats based on the new config. Must *only* be called from within the * ReplicationExecutor context. * * Returns an action to be performed after unlocking _mutex, via * _performPostMemberStateUpdateAction. */ PostMemberStateUpdateAction _setCurrentRSConfig_inlock( const ReplicaSetConfig& newConfig, int myIndex); /** * Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication. */ void _wakeReadyWaiters_inlock(); /** * Helper method for setting/unsetting maintenance mode. Scheduled by setMaintenanceMode() * to run in a global write lock in the replication executor thread. */ void _setMaintenanceMode_helper(const ReplicationExecutor::CallbackData& cbData, bool activate, Status* result); /** * Helper method for retrieving maintenance mode. Scheduled by getMaintenanceMode() to run * in the replication executor thread. */ void _getMaintenanceMode_helper(const ReplicationExecutor::CallbackData& cbData, bool* maintenanceMode); /** * Bottom half of fillIsMasterForReplSet. */ void _fillIsMasterForReplSet_finish(const ReplicationExecutor::CallbackData& cbData, IsMasterResponse* result); /** * Bottom half of processReplSetFresh. */ void _processReplSetFresh_finish(const ReplicationExecutor::CallbackData& cbData, const ReplSetFreshArgs& args, BSONObjBuilder* response, Status* result); /** * Bottom half of processReplSetElect. */ void _processReplSetElect_finish(const ReplicationExecutor::CallbackData& cbData, const ReplSetElectArgs& args, BSONObjBuilder* response, Status* result); /** * Bottom half of processReplSetFreeze. */ void _processReplSetFreeze_finish(const ReplicationExecutor::CallbackData& cbData, int secs, BSONObjBuilder* response, Status* result); /* * Bottom half of clearSyncSourceBlacklist */ void _clearSyncSourceBlacklist_finish(const ReplicationExecutor::CallbackData& cbData); /** * Scheduled to cause the ReplicationCoordinator to reconsider any state that might * need to change as a result of time passing - for instance becoming PRIMARY when a single * node replica set member's stepDown period ends. */ void _handleTimePassing(const ReplicationExecutor::CallbackData& cbData); /** * Helper method for _awaitReplication that takes an already locked unique_lock and a * Timer for timing the operation which has been counting since before the lock was * acquired. */ ReplicationCoordinator::StatusAndDuration _awaitReplication_inlock( const Timer* timer, boost::unique_lock* lock, const OperationContext* txn, const Timestamp& ts, const WriteConcernOptions& writeConcern); /* * Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable. */ bool _doneWaitingForReplication_inlock(const Timestamp& opTime, const WriteConcernOptions& writeConcern); /** * Helper for _doneWaitingForReplication_inlock that takes an integer write concern. */ bool _haveNumNodesReachedOpTime_inlock(const Timestamp& opTime, int numNodes); /** * Helper for _doneWaitingForReplication_inlock that takes a tag pattern representing a * named write concern mode. */ bool _haveTaggedNodesReachedOpTime_inlock(const Timestamp& opTime, const ReplicaSetTagPattern& tagPattern); Status _checkIfWriteConcernCanBeSatisfied_inlock( const WriteConcernOptions& writeConcern) const; /** * Triggers all callbacks that are blocked waiting for new heartbeat data * to decide whether or not to finish a step down. * Should only be called from executor callbacks. */ void _signalStepDownWaitersFromCallback(const ReplicationExecutor::CallbackData& cbData); void _signalStepDownWaiters(); /** * Helper for stepDown run within a ReplicationExecutor callback. This method assumes * it is running within a global shared lock, and thus that no writes are going on at the * same time. */ void _stepDownContinue(const ReplicationExecutor::CallbackData& cbData, const ReplicationExecutor::EventHandle finishedEvent, OperationContext* txn, Date_t waitUntil, Date_t stepdownUntil, bool force, Status* result); OID _getMyRID_inlock() const; int _getMyId_inlock() const; Timestamp _getMyLastOptime_inlock() const; /** * Bottom half of setFollowerMode. * * May reschedule itself after the current election, so it is not sufficient to * wait for a callback scheduled to execute this method to complete. Instead, * supply an event, "finishedSettingFollowerMode", and wait for that event to * be signaled. Do not observe "*success" until after the event is signaled. */ void _setFollowerModeFinish( const ReplicationExecutor::CallbackData& cbData, const MemberState& newState, const ReplicationExecutor::EventHandle& finishedSettingFollowerMode, bool* success); /** * Helper method for updating our tracking of the last optime applied by a given node. * This is only valid to call on replica sets. * "configVersion" will be populated with our config version if it and the configVersion * of "args" differ. */ Status _setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args, long long* configVersion); /** * Helper method for setMyLastOptime that takes in a unique lock on * _mutex. The passed in lock must already be locked. It is unspecified what state the * lock will be in after this method finishes. * * This function has the same rules for "ts" as setMyLastOptime(), unless * "isRollbackAllowed" is true. */ void _setMyLastOptime_inlock(boost::unique_lock* lock, const Timestamp& ts, bool isRollbackAllowed); /** * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index * into the replica set config members array that corresponds to the "target", or -1 if * "target" is not in _rsConfig. */ void _scheduleHeartbeatToTarget(const HostAndPort& target, int targetIndex, Date_t when); /** * Processes each heartbeat response. * * Schedules additional heartbeats, triggers elections and step downs, etc. */ void _handleHeartbeatResponse(const ReplicationExecutor::RemoteCommandCallbackData& cbData, int targetIndex); void _trackHeartbeatHandle(const StatusWith& handle); void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle); /** * Helper for _handleHeartbeatResponse. * * Updates the optime associated with the member at "memberIndex" in our config. */ void _updateOpTimeFromHeartbeat_inlock(int memberIndex, Timestamp optime); /** * Starts a heartbeat for each member in the current config. Called within the executor * context. */ void _startHeartbeats(); /** * Cancels all heartbeats. Called within executor context. */ void _cancelHeartbeats(); /** * Asynchronously sends a heartbeat to "target". "targetIndex" is the index * into the replica set config members array that corresponds to the "target", or -1 if * we don't have a valid replica set config. * * Scheduled by _scheduleHeartbeatToTarget. */ void _doMemberHeartbeat(ReplicationExecutor::CallbackData cbData, const HostAndPort& target, int targetIndex); MemberState _getMemberState_inlock() const; /** * Starts loading the replication configuration from local storage, and if it is valid, * schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set * config (sets _rsConfig and _thisMembersConfigIndex). * Returns true if it finishes loading the local config, which most likely means there * was no local config at all or it was invalid in some way, and false if there was a valid * config detected but more work is needed to set it as the local config (which will be * handled by the callback to _finishLoadLocalConfig). */ bool _startLoadLocalConfig(OperationContext* txn); /** * Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState * to kConfigSteady, so that we can begin processing heartbeats and reconfigs. */ void _finishLoadLocalConfig(const ReplicationExecutor::CallbackData& cbData, const ReplicaSetConfig& localConfig, const StatusWith& lastOpTimeStatus); /** * Callback that finishes the work of processReplSetInitiate() inside the replication * executor context, in the event of a successful quorum check. */ void _finishReplSetInitiate( const ReplicationExecutor::CallbackData& cbData, const ReplicaSetConfig& newConfig, int myIndex); /** * Callback that finishes the work of processReplSetReconfig inside the replication * executor context, in the event of a successful quorum check. */ void _finishReplSetReconfig( const ReplicationExecutor::CallbackData& cbData, const ReplicaSetConfig& newConfig, int myIndex); /** * Changes _rsConfigState to newState, and notify any waiters. */ void _setConfigState_inlock(ConfigState newState); /** * Updates the cached value, _memberState, to match _topCoord's reported * member state, from getMemberState(). * * Returns an enum indicating what action to take after releasing _mutex, if any. * Call performPostMemberStateUpdateAction on the return value after releasing * _mutex. */ PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock(); /** * Performs a post member-state update action. Do not call while holding _mutex. */ void _performPostMemberStateUpdateAction(PostMemberStateUpdateAction action); /** * Begins an attempt to elect this node. * Called after an incoming heartbeat changes this node's view of the set such that it * believes it can be elected PRIMARY. * For proper concurrency, must be called via a ReplicationExecutor callback. */ void _startElectSelf(); /** * Callback called when the FreshnessChecker has completed; checks the results and * decides whether to continue election proceedings. * finishEvh is an event that is signaled when election is complete. **/ void _onFreshnessCheckComplete(); /** * Callback called when the ElectCmdRunner has completed; checks the results and * decides whether to complete the election and change state to primary. * finishEvh is an event that is signaled when election is complete. **/ void _onElectCmdRunnerComplete(); /** * Callback called after a random delay, to prevent repeated election ties. */ void _recoverFromElectionTie(const ReplicationExecutor::CallbackData& cbData); /** * Chooses a new sync source. Must be scheduled as a callback. * * Calls into the Topology Coordinator, which uses its current view of the set to choose * the most appropriate sync source. */ void _chooseNewSyncSource(const ReplicationExecutor::CallbackData& cbData, HostAndPort* newSyncSource); /** * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot * be chosen as a sync source. Schedules a callback to unblacklist the sync source to be * run at 'until'. * * Must be scheduled as a callback. */ void _blacklistSyncSource(const ReplicationExecutor::CallbackData& cbData, const HostAndPort& host, Date_t until); /** * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply * ignored and no error is thrown. * * Must be scheduled as a callback. */ void _unblacklistSyncSource(const ReplicationExecutor::CallbackData& cbData, const HostAndPort& host); /** * Determines if a new sync source should be considered. * * Must be scheduled as a callback. */ void _shouldChangeSyncSource(const ReplicationExecutor::CallbackData& cbData, const HostAndPort& currentSource, bool* shouldChange); /** * Schedules a request that the given host step down; logs any errors. */ void _requestRemotePrimaryStepdown(const HostAndPort& target); void _heartbeatStepDownStart(); /** * Completes a step-down of the current node triggered by a heartbeat. Must * be run with a global shared or global exclusive lock. */ void _heartbeatStepDownFinish(const ReplicationExecutor::CallbackData& cbData); /** * Schedules a replica set config change. */ void _scheduleHeartbeatReconfig(const ReplicaSetConfig& newConfig); /** * Callback that continues a heartbeat-initiated reconfig after a running election * completes. */ void _heartbeatReconfigAfterElectionCanceled( const ReplicationExecutor::CallbackData& cbData, const ReplicaSetConfig& newConfig); /** * Method to write a configuration transmitted via heartbeat message to stable storage. */ void _heartbeatReconfigStore(const ReplicaSetConfig& newConfig); /** * Conclusion actions of a heartbeat-triggered reconfiguration. */ void _heartbeatReconfigFinish(const ReplicationExecutor::CallbackData& cbData, const ReplicaSetConfig& newConfig, StatusWith myIndex); /** * Utility method that schedules or performs actions specified by a HeartbeatResponseAction * returned by a TopologyCoordinator::processHeartbeatResponse call with the given * value of "responseStatus". */ void _handleHeartbeatResponseAction( const HeartbeatResponseAction& action, const StatusWith& responseStatus); /** * Bottom half of processHeartbeat(), which runs in the replication executor. */ void _processHeartbeatFinish(const ReplicationExecutor::CallbackData& cbData, const ReplSetHeartbeatArgs& args, ReplSetHeartbeatResponse* response, Status* outStatus); /** * Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of * servers; set _lastCommittedOpTime to this new entry, if greater than the current entry. */ void _updateLastCommittedOpTime_inlock(); // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. // // (R) Read-only in concurrent operation; no synchronization required. // (S) Self-synchronizing; access in any way from any context. // (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing; // Access in any context. // (M) Reads and writes guarded by _mutex // (X) Reads and writes must be performed in a callback in _replExecutor // (MX) Must hold _mutex and be in a callback in _replExecutor to write; must either hold // _mutex or be in a callback in _replExecutor to read. // (GX) Readable under a global intent lock. Must either hold global lock in exclusive // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and be in executor // context to write. // (I) Independently synchronized, see member variable comment. // Protects member data of this ReplicationCoordinator. mutable boost::mutex _mutex; // (S) // Handles to actively queued heartbeats. HeartbeatHandles _heartbeatHandles; // (X) // When this node does not know itself to be a member of a config, it adds // every host that sends it a heartbeat request to this set, and also starts // sending heartbeat requests to that host. This set is cleared whenever // a node discovers that it is a member of a config. unordered_set _seedList; // (X) // Parsed command line arguments related to replication. const ReplSettings _settings; // (R) // Mode of replication specified by _settings. const Mode _replMode; // (R) // Pointer to the TopologyCoordinator owned by this ReplicationCoordinator. boost::scoped_ptr _topCoord; // (X) // If the executer is owned then this will be set, but should not be used. // This is only used to clean up and destroy the replExec if owned std::unique_ptr _replExecutorIfOwned; // (S) // Executor that drives the topology coordinator. ReplicationExecutor& _replExecutor; // (S) // Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator. boost::scoped_ptr _externalState; // (PS) // Thread that drives actions in the topology coordinator // Set in startReplication() and thereafter accessed in shutdown. boost::scoped_ptr _topCoordDriverThread; // (I) // Thread that is used to write new configs received via a heartbeat reconfig // to stable storage. It is an error to change this if _inShutdown is true. boost::scoped_ptr _heartbeatReconfigThread; // (M) // Our RID, used to identify us to our sync source when sending replication progress // updates upstream. Set once in startReplication() and then never modified again. OID _myRID; // (M) // Rollback ID. Used to check if a rollback happened during some interval of time // TODO: ideally this should only change on rollbacks NOT on mongod restarts also. int _rbid; // (M) // list of information about clients waiting on replication. Does *not* own the // WaiterInfos. std::vector _replicationWaiterList; // (M) // Set to true when we are in the process of shutting down replication. bool _inShutdown; // (M) // Election ID of the last election that resulted in this node becoming primary. OID _electionId; // (M) // Vector containing known information about each member (such as replication // progress and member ID) in our replica set or each member replicating from // us in a master-slave deployment. In master/slave, the first entry is // guaranteed to correspond to ourself. In replica sets where we don't have a // valid config or are in state REMOVED then the vector will be a single element // just with info about ourself. In replica sets with a valid config the elements // will be in the same order as the members in the replica set config, thus // the entry for ourself will be at _thisMemberConfigIndex. SlaveInfoVector _slaveInfo; // (M) // Current ReplicaSet state. MemberState _memberState; // (MX) // True if we are waiting for the applier to finish draining. bool _isWaitingForDrainToComplete; // (M) // Used to signal threads waiting for changes to _rsConfigState. boost::condition_variable _rsConfigStateChange; // (M) // Represents the configuration state of the coordinator, which controls how and when // _rsConfig may change. See the state transition diagram in the type definition of // ConfigState for details. ConfigState _rsConfigState; // (M) // The current ReplicaSet configuration object, including the information about tag groups // that is used to satisfy write concern requests with named gle modes. ReplicaSetConfig _rsConfig; // (MX) // This member's index position in the current config. int _selfIndex; // (MX) // Vector of events that should be signaled whenever new heartbeat data comes in. std::vector _stepDownWaiters; // (X) // State for conducting an election of this node. // the presence of a non-null _freshnessChecker pointer indicates that an election is // currently in progress. Only one election is allowed at once. boost::scoped_ptr _freshnessChecker; // (X) boost::scoped_ptr _electCmdRunner; // (X) // Event that the election code will signal when the in-progress election completes. // Unspecified value when _freshnessChecker is NULL. ReplicationExecutor::EventHandle _electionFinishedEvent; // (X) // Whether we slept last time we attempted an election but possibly tied with other nodes. bool _sleptLastElection; // (X) // Flag that indicates whether writes to databases other than "local" are allowed. Used to // answer the canAcceptWritesForDatabase() question. Always true for standalone nodes and // masters in master-slave relationships. bool _canAcceptNonLocalWrites; // (GX) // Flag that indicates whether reads from databases other than "local" are allowed. Unlike // _canAcceptNonLocalWrites, above, this question is about admission control on secondaries, // and we do not require that its observers be strongly synchronized. Accidentally // providing the prior value for a limited period of time is acceptable. Also unlike // _canAcceptNonLocalWrites, its value is only meaningful on replica set secondaries. AtomicUInt32 _canServeNonLocalReads; // (S) // OpTime of the latest committed operation. Matches the concurrency level of _slaveInfo. Timestamp _lastCommittedOpTime; // (M) // Data Replicator used to replicate data DataReplicator _dr; // (S) }; } // namespace repl } // namespace mongo