/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include "mongo/base/status.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/repl/data_replicator.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/old_update_position_args.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_external_state.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/snapshot_name.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/unordered_map.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
class Timer;
template
class StatusWith;
namespace executor {
struct ConnectionPoolStats;
} // namespace executor
namespace rpc {
class ReplSetMetadata;
} // namespace rpc
namespace repl {
class ElectCmdRunner;
class FreshnessChecker;
class FreshnessScanner;
class HandshakeArgs;
class HeartbeatResponseAction;
class LastVote;
class OplogReader;
class ReplSetRequestVotesArgs;
class ReplicaSetConfig;
class SyncSourceFeedback;
class StorageInterface;
class TopologyCoordinator;
class VoteRequester;
class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpListenerInterface {
MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl);
public:
// For testing only.
using StepDownNonBlockingResult =
std::pair, ReplicationExecutor::EventHandle>;
// Takes ownership of the "externalState", "topCoord" and "network" objects.
ReplicationCoordinatorImpl(const ReplSettings& settings,
ReplicationCoordinatorExternalState* externalState,
executor::NetworkInterface* network,
TopologyCoordinator* topoCoord,
StorageInterface* storage,
int64_t prngSeed);
// Takes ownership of the "externalState" and "topCoord" objects.
ReplicationCoordinatorImpl(const ReplSettings& settings,
ReplicationCoordinatorExternalState* externalState,
TopologyCoordinator* topoCoord,
StorageInterface* storage,
ReplicationExecutor* replExec,
int64_t prngSeed,
stdx::function* isDurableStorageEngineFn);
virtual ~ReplicationCoordinatorImpl();
// ================== Members of public ReplicationCoordinator API ===================
virtual void startup(OperationContext* txn) override;
virtual void shutdown(OperationContext* txn) override;
virtual ReplicationExecutor* getExecutor() override {
return &_replExecutor;
}
virtual const ReplSettings& getSettings() const override;
virtual Mode getReplicationMode() const override;
virtual MemberState getMemberState() const override;
virtual Status waitForMemberState(MemberState expectedState, Milliseconds timeout) override;
virtual bool isInPrimaryOrSecondaryState() const override;
virtual Seconds getSlaveDelaySecs() const override;
virtual void clearSyncSourceBlacklist() override;
/*
* Implementation of the KillOpListenerInterface interrupt method so that we can wake up
* threads blocked in awaitReplication() when a killOp command comes in.
*/
virtual void interrupt(unsigned opId);
/*
* Implementation of the KillOpListenerInterface interruptAll method so that we can wake up
* threads blocked in awaitReplication() when we kill all operations.
*/
virtual void interruptAll();
virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
OperationContext* txn, const OpTime& opTime, const WriteConcernOptions& writeConcern);
virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpForClient(
OperationContext* txn, const WriteConcernOptions& writeConcern);
virtual Status stepDown(OperationContext* txn,
bool force,
const Milliseconds& waitTime,
const Milliseconds& stepdownTime);
virtual bool isMasterForReportingPurposes();
virtual bool canAcceptWritesForDatabase(StringData dbName);
bool canAcceptWritesFor(const NamespaceString& ns) override;
virtual Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions& writeConcern) const;
virtual Status checkCanServeReadsFor(OperationContext* txn,
const NamespaceString& ns,
bool slaveOk);
virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx);
virtual Status setLastOptimeForSlave(const OID& rid, const Timestamp& ts);
virtual void setMyLastAppliedOpTime(const OpTime& opTime);
virtual void setMyLastDurableOpTime(const OpTime& opTime);
virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime);
virtual void setMyLastDurableOpTimeForward(const OpTime& opTime);
virtual void resetMyLastOpTimes();
virtual void setMyHeartbeatMessage(const std::string& msg);
virtual OpTime getMyLastAppliedOpTime() const override;
virtual OpTime getMyLastDurableOpTime() const override;
virtual Status waitUntilOpTimeForRead(OperationContext* txn,
const ReadConcernArgs& settings) override;
virtual OID getElectionId() override;
virtual OID getMyRID() const override;
virtual int getMyId() const override;
virtual bool setFollowerMode(const MemberState& newState) override;
virtual bool isWaitingForApplierToDrain() override;
virtual bool isCatchingUp() override;
virtual void signalDrainComplete(OperationContext* txn) override;
virtual Status waitForDrainFinish(Milliseconds timeout) override;
virtual void signalUpstreamUpdater() override;
virtual Status resyncData(OperationContext* txn, bool waitUntilCompleted) override;
virtual StatusWith prepareReplSetUpdatePositionCommand(
ReplSetUpdatePositionCommandStyle commandStyle) const override;
virtual Status processReplSetGetStatus(BSONObjBuilder* result,
ReplSetGetStatusResponseStyle responseStyle) override;
virtual void fillIsMasterForReplSet(IsMasterResponse* result) override;
virtual void appendSlaveInfoData(BSONObjBuilder* result) override;
virtual ReplicaSetConfig getConfig() const override;
virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override;
virtual void cancelAndRescheduleElectionTimeout() override;
virtual Status setMaintenanceMode(bool activate) override;
virtual bool getMaintenanceMode() override;
virtual Status processReplSetSyncFrom(OperationContext* txn,
const HostAndPort& target,
BSONObjBuilder* resultObj) override;
virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override;
virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args,
ReplSetHeartbeatResponse* response) override;
virtual Status processReplSetReconfig(OperationContext* txn,
const ReplSetReconfigArgs& args,
BSONObjBuilder* resultObj) override;
virtual Status processReplSetInitiate(OperationContext* txn,
const BSONObj& configObj,
BSONObjBuilder* resultObj) override;
virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override;
virtual void incrementRollbackID() override;
virtual Status processReplSetFresh(const ReplSetFreshArgs& args,
BSONObjBuilder* resultObj) override;
virtual Status processReplSetElect(const ReplSetElectArgs& args,
BSONObjBuilder* response) override;
virtual Status processReplSetUpdatePosition(const OldUpdatePositionArgs& updates,
long long* configVersion) override;
virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates,
long long* configVersion) override;
virtual Status processHandshake(OperationContext* txn, const HandshakeArgs& handshake) override;
virtual bool buildsIndexes() override;
virtual std::vector getHostsWrittenTo(const OpTime& op,
bool durablyWritten) override;
virtual std::vector getOtherNodesInReplSet() const override;
virtual WriteConcernOptions getGetLastErrorDefault() override;
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override;
virtual bool isReplEnabled() const override;
virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) override;
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
virtual void resetLastOpTimesFromOplog(OperationContext* txn) override;
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const rpc::ReplSetMetadata& metadata) override;
virtual SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched) override;
virtual OpTime getLastCommittedOpTime() const override;
virtual Status processReplSetRequestVotes(OperationContext* txn,
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response) override;
void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const override;
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response) override;
virtual bool isV1ElectionProtocol() const override;
virtual bool getWriteConcernMajorityShouldJournal() override;
virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override;
virtual void dropAllSnapshots() override;
/**
* Get current term from topology coordinator
*/
virtual long long getTerm() override;
virtual Status updateTerm(OperationContext* txn, long long term) override;
virtual SnapshotName reserveSnapshotName(OperationContext* txn) override;
virtual void forceSnapshotCreation() override;
virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override;
virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
virtual void waitUntilSnapshotCommitted(OperationContext* txn,
const SnapshotName& untilSnapshot) override;
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
virtual size_t getNumUncommittedSnapshots() override;
virtual WriteConcernOptions populateUnsetWriteConcernOptionsSyncMode(
WriteConcernOptions wc) override;
virtual ReplSettings::IndexPrefetchConfig getIndexPrefetchConfig() const override;
virtual void setIndexPrefetchConfig(const ReplSettings::IndexPrefetchConfig cfg) override;
virtual Status stepUpIfEligible() override;
// ================== Test support API ===================
/**
* If called after startReplication(), blocks until all asynchronous
* activities associated with replication start-up complete.
*/
void waitForStartUpComplete();
/**
* Gets the replica set configuration in use by the node.
*/
ReplicaSetConfig getReplicaSetConfig_forTest();
/**
* Returns scheduled time of election timeout callback.
* Returns Date_t() if callback is not scheduled.
*/
Date_t getElectionTimeout_forTest() const;
/**
* Returns scheduled time of priority takeover callback.
* Returns Date_t() if callback is not scheduled.
*/
Date_t getPriorityTakeover_forTest() const;
/**
* Simple wrappers around _setLastOptime_inlock to make it easier to test.
*/
Status setLastAppliedOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
Status setLastDurableOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
/**
* Non-blocking version of stepDown.
* Returns a pair of global shared lock and event handle which are used to wait for the step
* down operation to complete. The global shared lock prevents writes until the step down has
* completed (or failed).
* When the operation is complete (wait() returns), 'result' will be set to the
* final status of the operation.
* If the handle is invalid, step down failed before we could schedule the rest of
* the step down processing and the error will be available immediately in 'result'.
*/
StepDownNonBlockingResult stepDown_nonBlocking(OperationContext* txn,
bool force,
const Milliseconds& waitTime,
const Milliseconds& stepdownTime,
Status* result);
/**
* Non-blocking version of setFollowerMode.
* Returns event handle that we can use to wait for the operation to complete.
* When the operation is complete (wait() returns), 'success' will be set to true
* if the member state has been set successfully.
*/
ReplicationExecutor::EventHandle setFollowerMode_nonBlocking(const MemberState& newState,
bool* success);
/**
* Non-blocking version of updateTerm.
* Returns event handle that we can use to wait for the operation to complete.
* When the operation is complete (waitForEvent() returns), 'updateResult' will be set
* to a status telling if the term increased or a stepdown was triggered.
*/
ReplicationExecutor::EventHandle updateTerm_forTest(
long long term, TopologyCoordinator::UpdateTermResult* updateResult);
/**
* If called after _startElectSelfV1(), blocks until all asynchronous
* activities associated with election complete.
*/
void waitForElectionFinish_forTest();
/**
* If called after _startElectSelfV1(), blocks until all asynchronous
* activities associated with election dry run complete, including writing
* last vote and scheduling the real election.
*/
void waitForElectionDryRunFinish_forTest();
private:
using CallbackFn = executor::TaskExecutor::CallbackFn;
using CallbackHandle = executor::TaskExecutor::CallbackHandle;
using EventHandle = executor::TaskExecutor::EventHandle;
using ScheduleFn = stdx::function(
const executor::TaskExecutor::CallbackFn& work)>;
struct SnapshotInfo {
OpTime opTime;
SnapshotName name;
bool operator==(const SnapshotInfo& other) const {
return std::tie(opTime, name) == std::tie(other.opTime, other.name);
}
bool operator!=(const SnapshotInfo& other) const {
return std::tie(opTime, name) != std::tie(other.opTime, other.name);
}
bool operator<(const SnapshotInfo& other) const {
return std::tie(opTime, name) < std::tie(other.opTime, other.name);
}
bool operator<=(const SnapshotInfo& other) const {
return std::tie(opTime, name) <= std::tie(other.opTime, other.name);
}
bool operator>(const SnapshotInfo& other) const {
return std::tie(opTime, name) > std::tie(other.opTime, other.name);
}
bool operator>=(const SnapshotInfo& other) const {
return std::tie(opTime, name) >= std::tie(other.opTime, other.name);
}
std::string toString() const;
};
class LoseElectionGuardV1;
class LoseElectionDryRunGuardV1;
ReplicationCoordinatorImpl(const ReplSettings& settings,
ReplicationCoordinatorExternalState* externalState,
TopologyCoordinator* topCoord,
StorageInterface* storage,
int64_t prngSeed,
executor::NetworkInterface* network,
ReplicationExecutor* replExec,
stdx::function* isDurableStorageEngineFn);
/**
* Configuration states for a replica set node.
*
* Transition diagram:
*
* PreStart ------------------> ReplicationDisabled
* |
* |
* v
* StartingUp -------> Uninitialized <------> Initiating
* \ ^ |
* ------- | |
* | | |
* v v |
* Reconfig <---> Steady <----> HBReconfig |
* ^ /
* | /
* \ /
* -----------------------
*/
enum ConfigState {
kConfigPreStart,
kConfigStartingUp,
kConfigReplicationDisabled,
kConfigUninitialized,
kConfigSteady,
kConfigInitiating,
kConfigReconfiguring,
kConfigHBReconfiguring
};
/**
* Type describing actions to take after a change to the MemberState _memberState.
*/
enum PostMemberStateUpdateAction {
kActionNone,
kActionCloseAllConnections, // Also indicates that we should clear sharding state.
kActionFollowerModeStateChange,
kActionWinElection,
kActionStartSingleNodeElection
};
// Struct that holds information about clients waiting for replication.
struct WaiterInfo;
struct WaiterInfoGuard;
class WaiterList {
public:
using WaiterType = WaiterInfo*;
// Adds waiter into the list. Usually, the waiter will be signaled only once and then
// removed.
void add_inlock(WaiterType waiter);
// Returns whether waiter is found and removed.
bool remove_inlock(WaiterType waiter);
// Signals and removes all waiters that satisfy the condition.
void signalAndRemoveIf_inlock(stdx::function fun);
// Signals and removes all waiters from the list.
void signalAndRemoveAll_inlock();
private:
std::vector _list;
};
// Struct that holds information about nodes in this replication group, mainly used for
// tracking replication progress for write concern satisfaction.
struct SlaveInfo {
// Our last known OpTime that this slave has applied and journaled to.
OpTime lastDurableOpTime;
// Our last known OpTime that this slave has applied, whether journaled or unjournaled.
OpTime lastAppliedOpTime;
HostAndPort hostAndPort; // Client address of the slave.
int memberId =
-1; // Id of the node in the replica set config, or -1 if we're not a replSet.
OID rid; // RID of the node.
bool self = false; // Whether this SlaveInfo stores the information about ourself
Date_t lastUpdate =
Date_t::max(); // The last time we heard from this node; used for liveness detection
bool down = false; // Indicator set when lastUpdate time exceeds the election timeout.
BSONObj toBSON() const;
std::string toString() const;
};
typedef std::vector SlaveInfoVector;
typedef std::vector HeartbeatHandles;
/**
* Appends a "replicationProgress" section with data for each member in set.
*/
void _appendSlaveInfoData_inlock(BSONObjBuilder* result);
/**
* Looks up the SlaveInfo in _slaveInfo associated with the given RID and returns a pointer
* to it, or returns NULL if there is no SlaveInfo with the given RID.
*/
SlaveInfo* _findSlaveInfoByRID_inlock(const OID& rid);
/**
* Looks up the SlaveInfo in _slaveInfo associated with the given member ID and returns a
* pointer to it, or returns NULL if there is no SlaveInfo with the given member ID.
*/
SlaveInfo* _findSlaveInfoByMemberID_inlock(int memberID);
/**
* Adds the given SlaveInfo to _slaveInfo and wakes up any threads waiting for replication
* that now have their write concern satisfied. Only valid to call in master/slave setups.
*/
void _addSlaveInfo_inlock(const SlaveInfo& slaveInfo);
/**
* Updates the durableOpTime field on the item in _slaveInfo pointed to by 'slaveInfo' with the
* given OpTime 'opTime' and wakes up any threads waiting for replication that now have their
* write concern satisfied.
*/
void _updateSlaveInfoDurableOpTime_inlock(SlaveInfo* slaveInfo, const OpTime& opTime);
/**
* Updates the appliedOpTime field on the item in _slaveInfo pointed to by 'slaveInfo' with the
* given OpTime 'opTime' and wakes up any threads waiting for replication that now have their
* write concern satisfied.
*/
void _updateSlaveInfoAppliedOpTime_inlock(SlaveInfo* slaveInfo, const OpTime& opTime);
/**
* Returns the index into _slaveInfo where data corresponding to ourself is stored.
* For more info on the rules about how we know where our entry is, see the comment for
* _slaveInfo.
*/
size_t _getMyIndexInSlaveInfo_inlock() const;
void _resetMyLastOpTimes_inlock();
/**
* Returns the _writeConcernMajorityJournalDefault of our current _rsConfig.
*/
bool getWriteConcernMajorityShouldJournal_inlock() const;
/**
* Helper method that removes entries from _slaveInfo if they correspond to a node
* with a member ID that is not in the current replica set config. Will always leave an
* entry for ourself at the beginning of _slaveInfo, even if we aren't present in the
* config.
*/
void _updateSlaveInfoFromConfig_inlock();
/**
* Helper to update our saved config, cancel any pending heartbeats, and kick off sending
* new heartbeats based on the new config. Must *only* be called from within the
* ReplicationExecutor context.
*
* Returns an action to be performed after unlocking _mutex, via
* _performPostMemberStateUpdateAction.
*/
PostMemberStateUpdateAction _setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig,
int myIndex);
/**
* Updates the last committed OpTime to be "committedOpTime" if it is more recent than the
* current last committed OpTime.
*/
void _setLastCommittedOpTime(const OpTime& committedOpTime);
void _setLastCommittedOpTime_inlock(const OpTime& committedOpTime);
/**
* Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication.
*/
void _wakeReadyWaiters_inlock();
/**
* Scheduled to cause the ReplicationCoordinator to reconsider any state that might
* need to change as a result of time passing - for instance becoming PRIMARY when a single
* node replica set member's stepDown period ends.
*/
void _handleTimePassing(const ReplicationExecutor::CallbackArgs& cbData);
/**
* Helper method for _awaitReplication that takes an already locked unique_lock and a
* Timer for timing the operation which has been counting since before the lock was
* acquired.
*/
ReplicationCoordinator::StatusAndDuration _awaitReplication_inlock(
const Timer* timer,
stdx::unique_lock* lock,
OperationContext* txn,
const OpTime& opTime,
SnapshotName minSnapshot,
const WriteConcernOptions& writeConcern);
/**
* Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable.
*
* If the writeConcern is 'majority', also waits for _currentCommittedSnapshot to be newer than
* minSnapshot.
*/
bool _doneWaitingForReplication_inlock(const OpTime& opTime,
SnapshotName minSnapshot,
const WriteConcernOptions& writeConcern);
/**
* Helper for _doneWaitingForReplication_inlock that takes an integer write concern.
* "durablyWritten" indicates whether the operation has to be durably applied.
*/
bool _haveNumNodesReachedOpTime_inlock(const OpTime& opTime, int numNodes, bool durablyWritten);
/**
* Helper for _doneWaitingForReplication_inlock that takes a tag pattern representing a
* named write concern mode.
* "durablyWritten" indicates whether the operation has to be durably applied.
*/
bool _haveTaggedNodesReachedOpTime_inlock(const OpTime& opTime,
const ReplicaSetTagPattern& tagPattern,
bool durablyWritten);
Status _checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions& writeConcern) const;
/**
* Triggers all callbacks that are blocked waiting for new heartbeat data
* to decide whether or not to finish a step down.
*/
void _signalStepDownWaiter_inlock();
/**
* Helper for stepDown run within a ReplicationExecutor callback. This method assumes
* it is running within a global shared lock, and thus that no writes are going on at the
* same time.
*/
void _stepDownContinue(const ReplicationExecutor::EventHandle finishedEvent,
OperationContext* txn,
Date_t waitUntil,
Date_t stepdownUntil,
bool force,
bool restartHeartbeats,
Status* result);
OID _getMyRID_inlock() const;
int _getMyId_inlock() const;
OpTime _getMyLastAppliedOpTime_inlock() const;
OpTime _getMyLastDurableOpTime_inlock() const;
/**
* Bottom half of setFollowerMode.
*
* May reschedule itself after the current election, so it is not sufficient to
* wait for a callback scheduled to execute this method to complete. Instead,
* supply an event, "finishedSettingFollowerMode", and wait for that event to
* be signaled. Do not observe "*success" until after the event is signaled.
*/
void _setFollowerModeFinish(const MemberState& newState,
const ReplicationExecutor::EventHandle& finishedSettingFollowerMode,
bool* success);
/**
* Helper method for updating our tracking of the last optime applied by a given node.
* This is only valid to call on replica sets.
* "configVersion" will be populated with our config version if it and the configVersion
* of "args" differ.
*
* The OldUpdatePositionArgs version provides support for the pre-3.2.4 format of
* UpdatePositionArgs.
*/
Status _setLastOptime_inlock(const OldUpdatePositionArgs::UpdateInfo& args,
long long* configVersion);
Status _setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args,
long long* configVersion);
/**
* This function will report our position externally (like upstream) if necessary.
*
* Takes in a unique lock, that must already be locked, on _mutex.
*
* Lock will be released after this method finishes.
*/
void _reportUpstream_inlock(stdx::unique_lock lock);
/**
* Helpers to set the last applied and durable OpTime.
*/
void _setMyLastAppliedOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed);
void _setMyLastDurableOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed);
/**
* Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index
* into the replica set config members array that corresponds to the "target", or -1 if
* "target" is not in _rsConfig.
*/
void _scheduleHeartbeatToTarget(const HostAndPort& target, int targetIndex, Date_t when);
/**
* Processes each heartbeat response.
*
* Schedules additional heartbeats, triggers elections and step downs, etc.
*/
void _handleHeartbeatResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
int targetIndex);
void _handleHeartbeatResponseV1(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
int targetIndex);
void _trackHeartbeatHandle(const StatusWith& handle);
void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle);
/**
* Helper for _handleHeartbeatResponse.
*
* Updates the lastDurableOpTime and lastAppliedOpTime associated with the member at
* "memberIndex" in our config.
*/
void _updateOpTimesFromHeartbeat_inlock(int targetIndex,
const OpTime& durableOpTime,
const OpTime& appliedOpTime);
/**
* Starts a heartbeat for each member in the current config. Called while holding _topoMutex
* and replCoord _mutex.
*/
void _startHeartbeats_inlock();
/**
* Cancels all heartbeats. Called while holding _topoMutex and replCoord _mutex.
*/
void _cancelHeartbeats_inlock();
/**
* Cancels all heartbeats, then starts a heartbeat for each member in the current config.
* Called while holding _topoMutex and replCoord _mutex.
*/
void _restartHeartbeats_inlock();
/**
* Asynchronously sends a heartbeat to "target". "targetIndex" is the index
* into the replica set config members array that corresponds to the "target", or -1 if
* we don't have a valid replica set config.
*
* Scheduled by _scheduleHeartbeatToTarget.
*/
void _doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,
const HostAndPort& target,
int targetIndex);
MemberState _getMemberState_inlock() const;
/**
* Starts loading the replication configuration from local storage, and if it is valid,
* schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set
* config (sets _rsConfig and _thisMembersConfigIndex).
* Returns true if it finishes loading the local config, which most likely means there
* was no local config at all or it was invalid in some way, and false if there was a valid
* config detected but more work is needed to set it as the local config (which will be
* handled by the callback to _finishLoadLocalConfig).
*/
bool _startLoadLocalConfig(OperationContext* txn);
/**
* Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState
* to kConfigSteady, so that we can begin processing heartbeats and reconfigs.
*/
void _finishLoadLocalConfig(const ReplicationExecutor::CallbackArgs& cbData,
const ReplicaSetConfig& localConfig,
const StatusWith& lastOpTimeStatus,
const StatusWith& lastVoteStatus);
/**
* Start replicating data, and does an initial sync if needed first.
*/
void _startDataReplication(OperationContext* txn,
stdx::function startCompleted = nullptr);
/**
* Stops replicating data by stopping the applier, fetcher and such.
*/
void _stopDataReplication(OperationContext* txn);
/**
* Finishes the work of processReplSetInitiate() while holding _topoMutex, in the event of
* a successful quorum check.
*/
void _finishReplSetInitiate(const ReplicaSetConfig& newConfig, int myIndex);
/**
* Finishes the work of processReplSetReconfig while holding _topoMutex, in the event of
* a successful quorum check.
*/
void _finishReplSetReconfig(const ReplicationExecutor::CallbackArgs& cbData,
const ReplicaSetConfig& newConfig,
int myIndex);
/**
* Changes _rsConfigState to newState, and notify any waiters.
*/
void _setConfigState_inlock(ConfigState newState);
/**
* Updates the cached value, _memberState, to match _topCoord's reported
* member state, from getMemberState().
*
* Returns an enum indicating what action to take after releasing _mutex, if any.
* Call performPostMemberStateUpdateAction on the return value after releasing
* _mutex.
*/
PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock();
/**
* Performs a post member-state update action. Do not call while holding _mutex.
*/
void _performPostMemberStateUpdateAction(PostMemberStateUpdateAction action);
/**
* Begins an attempt to elect this node.
* Called after an incoming heartbeat changes this node's view of the set such that it
* believes it can be elected PRIMARY.
* For proper concurrency, must be called while holding _topoMutex.
*
* For old style elections the election path is:
* _startElectSelf()
* _onFreshnessCheckComplete()
* _onElectCmdRunnerComplete()
* For V1 (raft) style elections the election path is:
* _startElectSelfV1()
* _onDryRunComplete()
* _writeLastVoteForMyElection()
* _startVoteRequester()
* _onVoteRequestComplete()
*/
void _startElectSelf();
void _startElectSelfV1();
/**
* Callback called when the FreshnessChecker has completed; checks the results and
* decides whether to continue election proceedings.
**/
void _onFreshnessCheckComplete();
/**
* Callback called when the ElectCmdRunner has completed; checks the results and
* decides whether to complete the election and change state to primary.
**/
void _onElectCmdRunnerComplete();
/**
* Callback called when the dryRun VoteRequester has completed; checks the results and
* decides whether to conduct a proper election.
* "originalTerm" was the term during which the dry run began, if the term has since
* changed, do not run for election.
*/
void _onDryRunComplete(long long originalTerm);
/**
* Writes the last vote in persistent storage after completing dry run successfully.
* This job will be scheduled to run in DB worker threads.
*/
void _writeLastVoteForMyElection(LastVote lastVote,
const ReplicationExecutor::CallbackArgs& cbData);
/**
* Starts VoteRequester to run the real election when last vote write has completed.
*/
void _startVoteRequester(long long newTerm);
/**
* Callback called when the VoteRequester has completed; checks the results and
* decides whether to change state to primary and alert other nodes of our primary-ness.
* "originalTerm" was the term during which the election began, if the term has since
* changed, do not step up as primary.
*/
void _onVoteRequestComplete(long long originalTerm);
/**
* Callback called after a random delay, to prevent repeated election ties.
*/
void _recoverFromElectionTie(const ReplicationExecutor::CallbackArgs& cbData);
/**
* Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply
* ignored and no error is thrown.
*
* Must be scheduled as a callback.
*/
void _unblacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
const HostAndPort& host);
/**
* Schedules a request that the given host step down; logs any errors.
*/
void _requestRemotePrimaryStepdown(const HostAndPort& target);
ReplicationExecutor::EventHandle _stepDownStart();
/**
* Completes a step-down of the current node. Must be run with a global
* shared or global exclusive lock.
* Signals 'finishedEvent' on successful completion.
*/
void _stepDownFinish(const ReplicationExecutor::CallbackArgs& cbData,
const ReplicationExecutor::EventHandle& finishedEvent);
/**
* Schedules a replica set config change.
*/
void _scheduleHeartbeatReconfig(const ReplicaSetConfig& newConfig);
/**
* Callback that continues a heartbeat-initiated reconfig after a running election
* completes.
*/
void _heartbeatReconfigAfterElectionCanceled(const ReplicationExecutor::CallbackArgs& cbData,
const ReplicaSetConfig& newConfig);
/**
* Method to write a configuration transmitted via heartbeat message to stable storage.
*/
void _heartbeatReconfigStore(const ReplicationExecutor::CallbackArgs& cbd,
const ReplicaSetConfig& newConfig);
/**
* Conclusion actions of a heartbeat-triggered reconfiguration.
*/
void _heartbeatReconfigFinish(const ReplicationExecutor::CallbackArgs& cbData,
const ReplicaSetConfig& newConfig,
StatusWith myIndex);
/**
* Utility method that schedules or performs actions specified by a HeartbeatResponseAction
* returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
* value of "responseStatus".
*/
void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action,
const StatusWith& responseStatus);
/**
* Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of
* servers; set _lastCommittedOpTime to this new entry, if greater than the current entry.
*/
void _updateLastCommittedOpTime_inlock();
/**
* This is used to set a floor of "newOpTime" on the OpTimes we will consider committed.
* This prevents entries from before our election from counting as committed in our view,
* until our election (the "newOpTime" op) has been committed.
*/
void _setFirstOpTimeOfMyTerm(const OpTime& newOpTime);
/**
* Callback that attempts to set the current term in topology coordinator and
* relinquishes primary if the term actually changes and we are primary.
* *updateTermResult will be the result of the update term attempt.
* Returns the finish event if it does not finish in this function, for example,
* due to stepdown, otherwise the returned EventHandle is invalid.
*/
EventHandle _updateTerm_incallback(
long long term, TopologyCoordinator::UpdateTermResult* updateTermResult = nullptr);
/**
* Callback that processes the ReplSetMetadata returned from a command run against another
* replica set member and updates protocol version 1 information (most recent optime that is
* committed, member id of the current PRIMARY, the current config version and the current term)
* Returns the finish event which is invalid if the process has already finished.
*/
EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata);
/**
* Blesses a snapshot to be used for new committed reads.
*/
void _updateCommittedSnapshot_inlock(SnapshotInfo newCommittedSnapshot);
/**
* Drops all snapshots and clears the "committed" snapshot.
*/
void _dropAllSnapshots_inlock();
/**
* Bottom half of _scheduleNextLivenessUpdate.
* Must be called with _topoMutex held.
*/
void _scheduleNextLivenessUpdate_inlock();
/**
* Callback which marks downed nodes as down, triggers a stepdown if a majority of nodes are no
* longer visible, and reschedules itself.
*/
void _handleLivenessTimeout(const ReplicationExecutor::CallbackArgs& cbData);
/**
* If "updatedMemberId" is the current _earliestMemberId, cancels the current
* _handleLivenessTimeout callback and calls _scheduleNextLivenessUpdate to schedule a new one.
* Returns immediately otherwise.
*/
void _cancelAndRescheduleLivenessUpdate_inlock(int updatedMemberId);
/**
* Cancels all outstanding _priorityTakeover callbacks.
*/
void _cancelPriorityTakeover_inlock();
/**
* Cancels the current _handleElectionTimeout callback and reschedules a new callback.
* Returns immediately otherwise.
*/
void _cancelAndRescheduleElectionTimeout_inlock();
/**
* Callback which starts an election if this node is electable and using protocolVersion 1.
* "isPriorityTakeover" is used to determine if the caller was a priority takeover or not and
* log messages accordingly.
*/
void _startElectSelfIfEligibleV1(bool isPriorityTakeover);
/**
* Resets the term of last vote to 0 to prevent any node from voting for term 0.
* Returns the event handle that indicates when last vote write finishes.
*/
EventHandle _resetElectionInfoOnProtocolVersionUpgrade(const ReplicaSetConfig& oldConfig,
const ReplicaSetConfig& newConfig);
/**
* Schedules work and returns handle to callback.
* If work cannot be scheduled due to shutdown, returns empty handle.
* All other non-shutdown scheduling failures will abort the process.
* Does not run 'work' if callback is canceled.
*/
CallbackHandle _scheduleWork(const CallbackFn& work);
/**
* Schedules work to be run no sooner than 'when' and returns handle to callback.
* If work cannot be scheduled due to shutdown, returns empty handle.
* All other non-shutdown scheduling failures will abort the process.
* Does not run 'work' if callback is canceled.
*/
CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work);
/**
* Schedules work and waits for completion.
*/
void _scheduleWorkAndWaitForCompletion(const CallbackFn& work);
/**
* Schedules work to be run no sooner than 'when' and waits for completion.
*/
void _scheduleWorkAtAndWaitForCompletion(Date_t when, const CallbackFn& work);
/**
* Schedules DB work and returns handle to callback.
* If work cannot be scheduled due to shutdown, returns empty handle.
* All other non-shutdown scheduling failures will abort the process.
* Does not run 'work' if callback is canceled.
*/
CallbackHandle _scheduleDBWork(const CallbackFn& work);
/**
* Does the actual work of scheduling the work with the executor.
* Used by _scheduleWork() and _scheduleWorkAt() only.
* Do not call this function directly.
*/
CallbackHandle _wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work);
/**
* Creates an event.
* Returns invalid event handle if the executor is shutting down.
* Otherwise aborts on non-shutdown error.
*/
EventHandle _makeEvent();
/**
* Schedule notification of election win.
*/
void _scheduleElectionWinNotification();
/**
* Wrap a function into executor callback.
* If the callback is cancelled, the given function won't run.
*/
executor::TaskExecutor::CallbackFn _wrapAsCallbackFn(const stdx::function& work);
/**
* Scan all nodes to find out the the latest optime in the replset, thus we know when there's no
* more to catch up before the timeout. It also schedules the actual catch-up once we get the
* response from the freshness scan.
*/
void _scanOpTimeForCatchUp_inlock();
/**
* Wait for data replication until we reach the latest optime, or the timeout expires.
* "originalTerm" is the term when catch-up work is scheduled and used to detect
* the step-down (and potential following step-up) after catch-up gets scheduled.
*/
void _catchUpOplogToLatest_inlock(const FreshnessScanner& scanner,
Milliseconds timeout,
long long originalTerm);
/**
* Finish catch-up mode and start drain mode.
* If "startToDrain" is true, the node enters drain mode. Otherwise, it goes back to secondary
* mode.
*/
void _finishCatchUpOplog_inlock(bool startToDrain);
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
//
// (R) Read-only in concurrent operation; no synchronization required.
// (S) Self-synchronizing; access in any way from any context.
// (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing;
// Access in any context.
// (M) Reads and writes guarded by _mutex
// (X) Reads and writes guarded by _topoMutex
// (MX) Must hold _mutex and _topoMutex to write; must either hold _mutex or _topoMutex
// to read.
// (GX) Readable under a global intent lock. Must either hold global lock in exclusive
// mode (MODE_X) or both hold global lock in shared mode (MODE_S) and hold _topoMutex
// to write.
// (I) Independently synchronized, see member variable comment.
// When both _mutex and _topoMutex are needed, the caller must follow the strict locking order
// to avoid deadlock: _topoMutex must be held before locking _mutex.
// In other words, _topoMutex can never be locked while holding _mutex.
// Protects member data of this ReplicationCoordinator.
mutable stdx::mutex _mutex; // (S)
// Protects member data of the TopologyCoordinator.
mutable stdx::mutex _topoMutex; // (S)
// Handles to actively queued heartbeats.
HeartbeatHandles _heartbeatHandles; // (X)
// When this node does not know itself to be a member of a config, it adds
// every host that sends it a heartbeat request to this set, and also starts
// sending heartbeat requests to that host. This set is cleared whenever
// a node discovers that it is a member of a config.
unordered_set _seedList; // (X)
// Parsed command line arguments related to replication.
const ReplSettings _settings; // (R)
// Mode of replication specified by _settings.
const Mode _replMode; // (R)
// Pointer to the TopologyCoordinator owned by this ReplicationCoordinator.
std::unique_ptr _topCoord; // (X)
// If the executer is owned then this will be set, but should not be used.
// This is only used to clean up and destroy the replExec if owned
std::unique_ptr _replExecutorIfOwned; // (S)
// Executor that drives the topology coordinator.
ReplicationExecutor& _replExecutor; // (S)
// Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator.
std::unique_ptr _externalState; // (PS)
// Our RID, used to identify us to our sync source when sending replication progress
// updates upstream. Set once in startReplication() and then never modified again.
OID _myRID; // (M)
// Rollback ID. Used to check if a rollback happened during some interval of time
// TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
int _rbid; // (M)
// list of information about clients waiting on replication. Does *not* own the WaiterInfos.
WaiterList _replicationWaiterList; // (M)
// list of information about clients waiting for a particular opTime.
// Does *not* own the WaiterInfos.
WaiterList _opTimeWaiterList; // (M)
// Set to true when we are in the process of shutting down replication.
bool _inShutdown; // (M)
// Election ID of the last election that resulted in this node becoming primary.
OID _electionId; // (M)
// Vector containing known information about each member (such as replication
// progress and member ID) in our replica set or each member replicating from
// us in a master-slave deployment. In master/slave, the first entry is
// guaranteed to correspond to ourself. In replica sets where we don't have a
// valid config or are in state REMOVED then the vector will be a single element
// just with info about ourself. In replica sets with a valid config the elements
// will be in the same order as the members in the replica set config, thus
// the entry for ourself will be at _thisMemberConfigIndex.
SlaveInfoVector _slaveInfo; // (M)
// Used to signal threads waiting for changes to _memberState.
stdx::condition_variable _memberStateChange; // (M)
// Current ReplicaSet state.
MemberState _memberState; // (MX)
// Used to signal threads waiting for changes to _memberState.
stdx::condition_variable _drainFinishedCond; // (M)
// True if we are waiting for the applier to finish draining.
bool _isWaitingForDrainToComplete; // (M)
// True if we are waiting for oplog catch-up to finish.
bool _isCatchingUp = false; // (M)
// Used to signal threads waiting for changes to _rsConfigState.
stdx::condition_variable _rsConfigStateChange; // (M)
// Represents the configuration state of the coordinator, which controls how and when
// _rsConfig may change. See the state transition diagram in the type definition of
// ConfigState for details.
ConfigState _rsConfigState; // (M)
// The current ReplicaSet configuration object, including the information about tag groups
// that is used to satisfy write concern requests with named gle modes.
ReplicaSetConfig _rsConfig; // (MX)
// This member's index position in the current config.
int _selfIndex; // (MX)
// Event handle that should be signaled whenever new heartbeat data comes in.
ReplicationExecutor::EventHandle _stepDownWaiter; // (M)
// State for conducting an election of this node.
// the presence of a non-null _freshnessChecker pointer indicates that an election is
// currently in progress. When using the V1 protocol, a non-null _voteRequester pointer
// indicates this instead.
// Only one election is allowed at a time.
std::unique_ptr _freshnessChecker; // (X)
std::unique_ptr _electCmdRunner; // (X)
std::unique_ptr _voteRequester; // (X)
// Event that the election code will signal when the in-progress election completes.
// Unspecified value when _freshnessChecker is NULL.
ReplicationExecutor::EventHandle _electionFinishedEvent; // (X)
// Event that the election code will signal when the in-progress election dry run completes,
// which includes writing the last vote and scheduling the real election.
ReplicationExecutor::EventHandle _electionDryRunFinishedEvent; // (X)
// Event that the stepdown code will signal when the in-progress stepdown completes.
ReplicationExecutor::EventHandle _stepDownFinishedEvent; // (X)
// Whether we slept last time we attempted an election but possibly tied with other nodes.
bool _sleptLastElection; // (X)
// Flag that indicates whether writes to databases other than "local" are allowed. Used to
// answer canAcceptWritesForDatabase() and canAcceptWritesFor() questions.
// Always true for standalone nodes and masters in master-slave relationships.
bool _canAcceptNonLocalWrites; // (GX)
// Flag that indicates whether reads from databases other than "local" are allowed. Unlike
// _canAcceptNonLocalWrites, above, this question is about admission control on secondaries,
// and we do not require that its observers be strongly synchronized. Accidentally
// providing the prior value for a limited period of time is acceptable. Also unlike
// _canAcceptNonLocalWrites, its value is only meaningful on replica set secondaries.
AtomicUInt32 _canServeNonLocalReads; // (S)
// OpTime of the latest committed operation. Matches the concurrency level of _slaveInfo.
OpTime _lastCommittedOpTime; // (M)
// OpTime representing our transition to PRIMARY and the start of our term.
// _lastCommittedOpTime cannot be set to an earlier OpTime.
OpTime _firstOpTimeOfMyTerm; // (M)
// Storage interface used by data replicator.
StorageInterface* _storage; // (PS)
// Data Replicator used to replicate data
std::unique_ptr _dr; // (M)
// Hands out the next snapshot name.
AtomicUInt64 _snapshotNameGenerator; // (S)
// The OpTimes and SnapshotNames for all snapshots newer than the current commit point, kept in
// sorted order. Any time this is changed, you must also update _uncommitedSnapshotsSize.
std::deque _uncommittedSnapshots; // (M)
// A cache of the size of _uncommittedSnaphots that can be read without any locking.
// May only be written to while holding _mutex.
AtomicUInt64 _uncommittedSnapshotsSize; // (I)
// The non-null OpTime and SnapshotName of the current snapshot used for committed reads, if
// there is one.
// When engaged, this must be <= _lastCommittedOpTime and < _uncommittedSnapshots.front().
boost::optional _currentCommittedSnapshot; // (M)
// Used to signal threads that are waiting for new committed snapshots.
stdx::condition_variable _currentCommittedSnapshotCond; // (M)
// The cached current term. It's in sync with the term in topology coordinator.
long long _cachedTerm = OpTime::kUninitializedTerm; // (M)
// Callback Handle used to cancel a scheduled LivenessTimeout callback.
ReplicationExecutor::CallbackHandle _handleLivenessTimeoutCbh; // (M)
// Callback Handle used to cancel a scheduled ElectionTimeout callback.
ReplicationExecutor::CallbackHandle _handleElectionTimeoutCbh; // (M)
// Election timeout callback will not run before this time.
// If this date is Date_t(), the callback is either unscheduled or canceled.
// Used for testing only.
Date_t _handleElectionTimeoutWhen; // (M)
// Callback Handle used to cancel a scheduled PriorityTakover callback.
ReplicationExecutor::CallbackHandle _priorityTakeoverCbh; // (M)
// Priority takeover callback will not run before this time.
// If this date is Date_t(), the callback is either unscheduled or canceled.
// Used for testing only.
Date_t _priorityTakeoverWhen; // (M)
// Callback handle used by waitForStartUpComplete() to block until configuration
// is loaded and external state threads have been started (unless this node is an arbiter).
// Used for testing only.
CallbackHandle _finishLoadLocalConfigCbh; // (M)
// The id of the earliest member, for which the handleLivenessTimeout callback has been
// scheduled. We need this so that we don't needlessly cancel and reschedule the callback on
// every liveness update.
int _earliestMemberId = -1; // (M)
// Cached copy of the current config protocol version.
AtomicInt64 _protVersion; // (S)
// Lambda indicating durability of storageEngine.
stdx::function _isDurableStorageEngine; // (R)
// This setting affects the Applier prefetcher behavior.
mutable stdx::mutex _indexPrefetchMutex;
ReplSettings::IndexPrefetchConfig _indexPrefetchConfig =
ReplSettings::IndexPrefetchConfig::PREFETCH_ALL; // (I)
};
} // namespace repl
} // namespace mongo