/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include "mongo/base/status.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_external_state.h"
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
class Timer;
template
class StatusWith;
namespace executor {
struct ConnectionPoolStats;
} // namespace executor
namespace rpc {
class OplogQueryMetadata;
class ReplSetMetadata;
} // namespace rpc
namespace repl {
class ElectCmdRunner;
class FreshnessChecker;
class HandshakeArgs;
class HeartbeatResponseAction;
class LastVote;
class OplogReader;
class ReplicationProcess;
class ReplSetRequestVotesArgs;
class ReplSetConfig;
class SyncSourceFeedback;
class StorageInterface;
class TopologyCoordinator;
class VoteRequester;
class ReplicationCoordinatorImpl : public ReplicationCoordinator {
MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl);
public:
ReplicationCoordinatorImpl(ServiceContext* serviceContext,
const ReplSettings& settings,
std::unique_ptr externalState,
std::unique_ptr executor,
std::unique_ptr topoCoord,
ReplicationProcess* replicationProcess,
StorageInterface* storage,
int64_t prngSeed);
virtual ~ReplicationCoordinatorImpl();
// ================== Members of public ReplicationCoordinator API ===================
virtual void startup(OperationContext* opCtx) override;
virtual void shutdown(OperationContext* opCtx) override;
virtual const ReplSettings& getSettings() const override;
virtual Mode getReplicationMode() const override;
virtual MemberState getMemberState() const override;
virtual Status waitForMemberState(MemberState expectedState, Milliseconds timeout) override;
virtual bool isInPrimaryOrSecondaryState() const override;
virtual Seconds getSlaveDelaySecs() const override;
virtual void clearSyncSourceBlacklist() override;
virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern);
virtual Status stepDown(OperationContext* opCtx,
bool force,
const Milliseconds& waitTime,
const Milliseconds& stepdownTime);
virtual bool isMasterForReportingPurposes();
virtual bool canAcceptWritesForDatabase(OperationContext* opCtx, StringData dbName);
virtual bool canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx, StringData dbName);
bool canAcceptWritesFor(OperationContext* opCtx, const NamespaceString& ns) override;
bool canAcceptWritesFor_UNSAFE(OperationContext* opCtx, const NamespaceString& ns) override;
virtual Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions& writeConcern) const;
virtual Status checkCanServeReadsFor(OperationContext* opCtx,
const NamespaceString& ns,
bool slaveOk);
virtual Status checkCanServeReadsFor_UNSAFE(OperationContext* opCtx,
const NamespaceString& ns,
bool slaveOk);
virtual bool shouldRelaxIndexConstraints(OperationContext* opCtx, const NamespaceString& ns);
virtual Status setLastOptimeForSlave(const OID& rid, const Timestamp& ts);
virtual void setMyLastAppliedOpTime(const OpTime& opTime);
virtual void setMyLastDurableOpTime(const OpTime& opTime);
virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime, DataConsistency consistency);
virtual void setMyLastDurableOpTimeForward(const OpTime& opTime);
virtual void resetMyLastOpTimes();
virtual void setMyHeartbeatMessage(const std::string& msg);
virtual OpTime getMyLastAppliedOpTime() const override;
virtual OpTime getMyLastDurableOpTime() const override;
virtual Status waitUntilOpTimeForReadUntil(OperationContext* opCtx,
const ReadConcernArgs& readConcern,
boost::optional deadline) override;
virtual Status waitUntilOpTimeForRead(OperationContext* opCtx,
const ReadConcernArgs& readConcern) override;
virtual OID getElectionId() override;
virtual OID getMyRID() const override;
virtual int getMyId() const override;
virtual Status setFollowerMode(const MemberState& newState) override;
virtual ApplierState getApplierState() override;
virtual void signalDrainComplete(OperationContext* opCtx,
long long termWhenBufferIsEmpty) override;
virtual Status waitForDrainFinish(Milliseconds timeout) override;
virtual void signalUpstreamUpdater() override;
virtual Status resyncData(OperationContext* opCtx, bool waitUntilCompleted) override;
virtual StatusWith prepareReplSetUpdatePositionCommand() const override;
virtual Status processReplSetGetStatus(BSONObjBuilder* result,
ReplSetGetStatusResponseStyle responseStyle) override;
virtual void fillIsMasterForReplSet(IsMasterResponse* result) override;
virtual void appendSlaveInfoData(BSONObjBuilder* result) override;
virtual ReplSetConfig getConfig() const override;
virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override;
virtual void advanceCommitPoint(const OpTime& committedOpTime) override;
virtual void cancelAndRescheduleElectionTimeout() override;
virtual Status setMaintenanceMode(bool activate) override;
virtual bool getMaintenanceMode() override;
virtual Status processReplSetSyncFrom(OperationContext* opCtx,
const HostAndPort& target,
BSONObjBuilder* resultObj) override;
virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override;
virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args,
ReplSetHeartbeatResponse* response) override;
virtual Status processReplSetReconfig(OperationContext* opCtx,
const ReplSetReconfigArgs& args,
BSONObjBuilder* resultObj) override;
virtual Status processReplSetInitiate(OperationContext* opCtx,
const BSONObj& configObj,
BSONObjBuilder* resultObj) override;
virtual Status processReplSetFresh(const ReplSetFreshArgs& args,
BSONObjBuilder* resultObj) override;
virtual Status processReplSetElect(const ReplSetElectArgs& args,
BSONObjBuilder* response) override;
virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates,
long long* configVersion) override;
virtual Status processHandshake(OperationContext* opCtx,
const HandshakeArgs& handshake) override;
virtual bool buildsIndexes() override;
virtual std::vector getHostsWrittenTo(const OpTime& op,
bool durablyWritten) override;
virtual std::vector getOtherNodesInReplSet() const override;
virtual WriteConcernOptions getGetLastErrorDefault() override;
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override;
virtual bool isReplEnabled() const override;
virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) override;
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
virtual void resetLastOpTimesFromOplog(OperationContext* opCtx,
DataConsistency consistency) override;
virtual bool shouldChangeSyncSource(
const HostAndPort& currentSource,
const rpc::ReplSetMetadata& replMetadata,
boost::optional oqMetadata) override;
virtual OpTime getLastCommittedOpTime() const override;
virtual Status processReplSetRequestVotes(OperationContext* opCtx,
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response) override;
virtual void prepareReplMetadata(const BSONObj& metadataRequestObj,
const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const override;
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response) override;
virtual bool isV1ElectionProtocol() const override;
virtual bool getWriteConcernMajorityShouldJournal() override;
virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override;
virtual void dropAllSnapshots() override;
/**
* Get current term from topology coordinator
*/
virtual long long getTerm() override;
// Returns the ServiceContext where this instance runs.
virtual ServiceContext* getServiceContext() override {
return _service;
}
virtual Status updateTerm(OperationContext* opCtx, long long term) override;
virtual Timestamp getMinimumVisibleSnapshot(OperationContext* opCtx) override;
virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
virtual void waitUntilSnapshotCommitted(OperationContext* opCtx,
const Timestamp& untilSnapshot) override;
virtual void appendDiagnosticBSON(BSONObjBuilder*) override;
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
virtual size_t getNumUncommittedSnapshots() override;
virtual WriteConcernOptions populateUnsetWriteConcernOptionsSyncMode(
WriteConcernOptions wc) override;
virtual ReplSettings::IndexPrefetchConfig getIndexPrefetchConfig() const override;
virtual void setIndexPrefetchConfig(const ReplSettings::IndexPrefetchConfig cfg) override;
virtual Status stepUpIfEligible() override;
virtual Status abortCatchupIfNeeded() override;
// ================== Test support API ===================
/**
* If called after startReplication(), blocks until all asynchronous
* activities associated with replication start-up complete.
*/
void waitForStartUpComplete_forTest();
/**
* Gets the replica set configuration in use by the node.
*/
ReplSetConfig getReplicaSetConfig_forTest();
/**
* Returns scheduled time of election timeout callback.
* Returns Date_t() if callback is not scheduled.
*/
Date_t getElectionTimeout_forTest() const;
/*
* Return a randomized offset amount that is scaled in proportion to the size of the
* _electionTimeoutPeriod.
*/
Milliseconds getRandomizedElectionOffset_forTest();
/**
* Returns the scheduled time of the priority takeover callback. If a priority
* takeover has not been scheduled, returns boost::none.
*/
boost::optional getPriorityTakeover_forTest() const;
/**
* Returns the scheduled time of the catchup takeover callback. If a catchup
* takeover has not been scheduled, returns boost::none.
*/
boost::optional getCatchupTakeover_forTest() const;
/**
* Returns the catchup takeover CallbackHandle.
*/
executor::TaskExecutor::CallbackHandle getCatchupTakeoverCbh_forTest() const;
/**
* Simple wrappers around _setLastOptime_inlock to make it easier to test.
*/
Status setLastAppliedOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
Status setLastDurableOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
/**
* Simple test wrappers that expose private methods.
*/
boost::optional calculateStableOpTime_forTest(const std::set& candidates,
const OpTime& commitPoint);
void cleanupStableOpTimeCandidates_forTest(std::set* candidates, OpTime stableOpTime);
std::set getStableOpTimeCandidates_forTest();
boost::optional getStableOpTime_forTest();
/**
* Non-blocking version of updateTerm.
* Returns event handle that we can use to wait for the operation to complete.
* When the operation is complete (waitForEvent() returns), 'updateResult' will be set
* to a status telling if the term increased or a stepdown was triggered.
*/
executor::TaskExecutor::EventHandle updateTerm_forTest(
long long term, TopologyCoordinator::UpdateTermResult* updateResult);
/**
* If called after _startElectSelfV1(), blocks until all asynchronous
* activities associated with election complete.
*/
void waitForElectionFinish_forTest();
/**
* If called after _startElectSelfV1(), blocks until all asynchronous
* activities associated with election dry run complete, including writing
* last vote and scheduling the real election.
*/
void waitForElectionDryRunFinish_forTest();
/**
* Waits until a stepdown command has begun. Callers should ensure that the stepdown attempt
* won't fully complete before this method is called, or this method may never return.
*/
void waitForStepDownAttempt_forTest();
private:
using CallbackFn = executor::TaskExecutor::CallbackFn;
using CallbackHandle = executor::TaskExecutor::CallbackHandle;
using EventHandle = executor::TaskExecutor::EventHandle;
using ScheduleFn = stdx::function(
const executor::TaskExecutor::CallbackFn& work)>;
class LoseElectionGuardV1;
class LoseElectionDryRunGuardV1;
/**
* Configuration states for a replica set node.
*
* Transition diagram:
*
* PreStart ------------------> ReplicationDisabled
* |
* |
* v
* StartingUp -------> Uninitialized <------> Initiating
* \ ^ |
* ------- | |
* | | |
* v v |
* Reconfig <---> Steady <----> HBReconfig |
* ^ /
* | /
* \ /
* -----------------------
*/
enum ConfigState {
kConfigPreStart,
kConfigStartingUp,
kConfigReplicationDisabled,
kConfigUninitialized,
kConfigSteady,
kConfigInitiating,
kConfigReconfiguring,
kConfigHBReconfiguring
};
/**
* Type describing actions to take after a change to the MemberState _memberState.
*/
enum PostMemberStateUpdateAction {
kActionNone,
kActionCloseAllConnections, // Also indicates that we should clear sharding state.
kActionFollowerModeStateChange,
kActionWinElection,
kActionStartSingleNodeElection
};
// Abstract struct that holds information about clients waiting for replication.
// Subclasses need to define how to notify them.
struct Waiter {
Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern);
virtual ~Waiter() = default;
BSONObj toBSON() const;
std::string toString() const;
// It is invalid to call notify_inlock() unless holding ReplicationCoordinatorImpl::_mutex.
virtual void notify_inlock() = 0;
const OpTime opTime;
const WriteConcernOptions* writeConcern = nullptr;
};
// When ThreadWaiter gets notified, it will signal the conditional variable.
//
// This is used when a thread wants to block inline until the opTime is reached with the given
// writeConcern.
struct ThreadWaiter : public Waiter {
ThreadWaiter(OpTime _opTime,
const WriteConcernOptions* _writeConcern,
stdx::condition_variable* _condVar);
void notify_inlock() override;
stdx::condition_variable* condVar = nullptr;
};
// When the waiter is notified, finishCallback will be called while holding replCoord _mutex
// since WaiterLists are protected by _mutex.
//
// This is used when we want to run a callback when the opTime is reached.
struct CallbackWaiter : public Waiter {
using FinishFunc = stdx::function;
CallbackWaiter(OpTime _opTime, FinishFunc _finishCallback);
void notify_inlock() override;
// The callback that will be called when this waiter is notified.
FinishFunc finishCallback = nullptr;
};
class WaiterGuard;
class WaiterList {
public:
using WaiterType = Waiter*;
// Adds waiter into the list.
void add_inlock(WaiterType waiter);
// Returns whether waiter is found and removed.
bool remove_inlock(WaiterType waiter);
// Signals and removes all waiters that satisfy the condition.
void signalAndRemoveIf_inlock(stdx::function fun);
// Signals and removes all waiters from the list.
void signalAndRemoveAll_inlock();
private:
std::vector _list;
};
typedef std::vector HeartbeatHandles;
// The state and logic of primary catchup.
//
// When start() is called, CatchupState will schedule the timeout callback. When we get
// responses of the latest heartbeats from all nodes, the target time (opTime of _waiter) is
// set.
// The primary exits catchup mode when any of the following happens.
// 1) My last applied optime reaches the target optime, if we've received a heartbeat from all
// nodes.
// 2) Catchup timeout expires.
// 3) Primary steps down.
// 4) The primary has to roll back to catch up.
// 5) The primary is too stale to catch up.
//
// On abort, the state resets the pointer to itself in ReplCoordImpl. In other words, the
// life cycle of the state object aligns with the conceptual state.
// In shutdown, the timeout callback will be canceled by the executor and the state is safe to
// destroy.
//
// Any function of the state must be called while holding _mutex.
class CatchupState {
public:
CatchupState(ReplicationCoordinatorImpl* repl) : _repl(repl) {}
// start() can only be called once.
void start_inlock();
// Reset the state itself to destruct the state.
void abort_inlock();
// Heartbeat calls this function to update the target optime.
void signalHeartbeatUpdate_inlock();
private:
ReplicationCoordinatorImpl* _repl; // Not owned.
// Callback handle used to cancel a scheduled catchup timeout callback.
executor::TaskExecutor::CallbackHandle _timeoutCbh;
// Handle to a Waiter that contains the current target optime to reach after which
// we can exit catchup mode.
std::unique_ptr _waiter;
};
void _resetMyLastOpTimes_inlock();
/**
* Returns the _writeConcernMajorityJournalDefault of our current _rsConfig.
*/
bool getWriteConcernMajorityShouldJournal_inlock() const;
/**
* Returns the OpTime of the current committed snapshot, if one exists.
*/
OpTime _getCurrentCommittedSnapshotOpTime_inlock() const;
/**
* Returns the OpTime of the current committed snapshot converted to LogicalTime.
*/
LogicalTime _getCurrentCommittedLogicalTime_inlock() const;
/**
* Verifies that ReadConcernArgs match node's readConcern.
*/
Status _validateReadConcern(OperationContext* opCtx, const ReadConcernArgs& readConcern);
/**
* Helper to update our saved config, cancel any pending heartbeats, and kick off sending
* new heartbeats based on the new config.
*
* Returns an action to be performed after unlocking _mutex, via
* _performPostMemberStateUpdateAction.
*/
PostMemberStateUpdateAction _setCurrentRSConfig_inlock(OperationContext* opCtx,
const ReplSetConfig& newConfig,
int myIndex);
/**
* Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication.
*/
void _wakeReadyWaiters_inlock();
/**
* Scheduled to cause the ReplicationCoordinator to reconsider any state that might
* need to change as a result of time passing - for instance becoming PRIMARY when a single
* node replica set member's stepDown period ends.
*/
void _handleTimePassing(const executor::TaskExecutor::CallbackArgs& cbData);
/**
* Helper method for _awaitReplication that takes an already locked unique_lock, but leaves
* operation timing to the caller.
*/
Status _awaitReplication_inlock(stdx::unique_lock* lock,
OperationContext* opCtx,
const OpTime& opTime,
const WriteConcernOptions& writeConcern);
/**
* Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable.
*
* If the writeConcern is 'majority', also waits for _currentCommittedSnapshot to be newer than
* minSnapshot.
*/
bool _doneWaitingForReplication_inlock(const OpTime& opTime,
const WriteConcernOptions& writeConcern);
Status _checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions& writeConcern) const;
/**
* Wakes up threads in the process of handling a stepdown request based on whether the
* TopologyCoordinator now believes enough secondaries are caught up for the stepdown request to
* complete.
*/
void _signalStepDownWaiterIfReady_inlock();
bool _canAcceptWritesFor_inlock(const NamespaceString& ns);
OID _getMyRID_inlock() const;
int _getMyId_inlock() const;
OpTime _getMyLastAppliedOpTime_inlock() const;
OpTime _getMyLastDurableOpTime_inlock() const;
/**
* Helper method for updating our tracking of the last optime applied by a given node.
* This is only valid to call on replica sets.
* "configVersion" will be populated with our config version if it and the configVersion
* of "args" differ.
*/
Status _setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args,
long long* configVersion);
/**
* This function will report our position externally (like upstream) if necessary.
*
* Takes in a unique lock, that must already be locked, on _mutex.
*
* Lock will be released after this method finishes.
*/
void _reportUpstream_inlock(stdx::unique_lock lock);
/**
* Helpers to set the last applied and durable OpTime.
*/
void _setMyLastAppliedOpTime_inlock(const OpTime& opTime,
bool isRollbackAllowed,
DataConsistency consistency);
void _setMyLastDurableOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed);
/**
* Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index
* into the replica set config members array that corresponds to the "target", or -1 if
* "target" is not in _rsConfig.
*/
void _scheduleHeartbeatToTarget_inlock(const HostAndPort& target, int targetIndex, Date_t when);
/**
* Processes each heartbeat response.
*
* Schedules additional heartbeats, triggers elections and step downs, etc.
*/
void _handleHeartbeatResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
int targetIndex);
void _trackHeartbeatHandle_inlock(
const StatusWith& handle);
void _untrackHeartbeatHandle_inlock(const executor::TaskExecutor::CallbackHandle& handle);
/*
* Return a randomized offset amount that is scaled in proportion to the size of the
* _electionTimeoutPeriod. Used to add randomization to an election timeout.
*/
Milliseconds _getRandomizedElectionOffset_inlock();
/**
* Starts a heartbeat for each member in the current config. Called while holding _mutex.
*/
void _startHeartbeats_inlock();
/**
* Cancels all heartbeats. Called while holding replCoord _mutex.
*/
void _cancelHeartbeats_inlock();
/**
* Cancels all heartbeats, then starts a heartbeat for each member in the current config.
* Called while holding replCoord _mutex.
*/
void _restartHeartbeats_inlock();
/**
* Asynchronously sends a heartbeat to "target". "targetIndex" is the index
* into the replica set config members array that corresponds to the "target", or -1 if
* we don't have a valid replica set config.
*
* Scheduled by _scheduleHeartbeatToTarget_inlock.
*/
void _doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData,
const HostAndPort& target,
int targetIndex);
MemberState _getMemberState_inlock() const;
/**
* Starts loading the replication configuration from local storage, and if it is valid,
* schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set
* config (sets _rsConfig and _thisMembersConfigIndex).
* Returns true if it finishes loading the local config, which most likely means there
* was no local config at all or it was invalid in some way, and false if there was a valid
* config detected but more work is needed to set it as the local config (which will be
* handled by the callback to _finishLoadLocalConfig).
*/
bool _startLoadLocalConfig(OperationContext* opCtx);
/**
* Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState
* to kConfigSteady, so that we can begin processing heartbeats and reconfigs.
*/
void _finishLoadLocalConfig(const executor::TaskExecutor::CallbackArgs& cbData,
const ReplSetConfig& localConfig,
const StatusWith& lastOpTimeStatus,
const StatusWith& lastVoteStatus);
/**
* Start replicating data, and does an initial sync if needed first.
*/
void _startDataReplication(OperationContext* opCtx,
stdx::function startCompleted = nullptr);
/**
* Stops replicating data by stopping the applier, fetcher and such.
*/
void _stopDataReplication(OperationContext* opCtx);
/**
* Finishes the work of processReplSetInitiate() in the event of a successful quorum check.
*/
void _finishReplSetInitiate(OperationContext* opCtx,
const ReplSetConfig& newConfig,
int myIndex);
/**
* Finishes the work of processReplSetReconfig, in the event of
* a successful quorum check.
*/
void _finishReplSetReconfig(const executor::TaskExecutor::CallbackArgs& cbData,
const ReplSetConfig& newConfig,
bool isForceReconfig,
int myIndex,
const executor::TaskExecutor::EventHandle& finishedEvent);
/**
* Changes _rsConfigState to newState, and notify any waiters.
*/
void _setConfigState_inlock(ConfigState newState);
/**
* Updates the cached value, _memberState, to match _topCoord's reported
* member state, from getMemberState().
*
* Returns an enum indicating what action to take after releasing _mutex, if any.
* Call performPostMemberStateUpdateAction on the return value after releasing
* _mutex.
*
* Note: opCtx may be null as currently not all paths thread an OperationContext all the way
* down, but it must be non-null for any calls that change _canAcceptNonLocalWrites.
*/
PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock(
OperationContext* opCtx);
/**
* Performs a post member-state update action. Do not call while holding _mutex.
*/
void _performPostMemberStateUpdateAction(PostMemberStateUpdateAction action);
/**
* Begins an attempt to elect this node.
* Called after an incoming heartbeat changes this node's view of the set such that it
* believes it can be elected PRIMARY.
* For proper concurrency, start methods must be called while holding _mutex.
*
* For old style elections the election path is:
* _startElectSelf_inlock()
* _onFreshnessCheckComplete()
* _onElectCmdRunnerComplete()
* For V1 (raft) style elections the election path is:
* _startElectSelfV1() or _startElectSelfV1_inlock()
* _onDryRunComplete()
* _writeLastVoteForMyElection()
* _startVoteRequester_inlock()
* _onVoteRequestComplete()
*/
void _startElectSelf_inlock();
void _startElectSelfV1_inlock(TopologyCoordinator::StartElectionReason reason);
void _startElectSelfV1(TopologyCoordinator::StartElectionReason reason);
/**
* Callback called when the FreshnessChecker has completed; checks the results and
* decides whether to continue election proceedings.
**/
void _onFreshnessCheckComplete();
/**
* Callback called when the ElectCmdRunner has completed; checks the results and
* decides whether to complete the election and change state to primary.
**/
void _onElectCmdRunnerComplete();
/**
* Callback called when the dryRun VoteRequester has completed; checks the results and
* decides whether to conduct a proper election.
* "originalTerm" was the term during which the dry run began, if the term has since
* changed, do not run for election.
*/
void _onDryRunComplete(long long originalTerm);
/**
* Writes the last vote in persistent storage after completing dry run successfully.
* This job will be scheduled to run in DB worker threads.
*/
void _writeLastVoteForMyElection(LastVote lastVote,
const executor::TaskExecutor::CallbackArgs& cbData);
/**
* Starts VoteRequester to run the real election when last vote write has completed.
*/
void _startVoteRequester_inlock(long long newTerm);
/**
* Callback called when the VoteRequester has completed; checks the results and
* decides whether to change state to primary and alert other nodes of our primary-ness.
* "originalTerm" was the term during which the election began, if the term has since
* changed, do not step up as primary.
*/
void _onVoteRequestComplete(long long originalTerm);
/**
* Callback called after a random delay, to prevent repeated election ties.
*/
void _recoverFromElectionTie(const executor::TaskExecutor::CallbackArgs& cbData);
/**
* Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply
* ignored and no error is thrown.
*
* Must be scheduled as a callback.
*/
void _unblacklistSyncSource(const executor::TaskExecutor::CallbackArgs& cbData,
const HostAndPort& host);
/**
* Schedules a request that the given host step down; logs any errors.
*/
void _requestRemotePrimaryStepdown(const HostAndPort& target);
/**
* Schedules stepdown to run with the global exclusive lock.
*/
executor::TaskExecutor::EventHandle _stepDownStart();
/**
* Completes a step-down of the current node. Must be run with a global
* shared or global exclusive lock.
* Signals 'finishedEvent' on successful completion.
*/
void _stepDownFinish(const executor::TaskExecutor::CallbackArgs& cbData,
const executor::TaskExecutor::EventHandle& finishedEvent);
/**
* Schedules a replica set config change.
*/
void _scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig);
/**
* Method to write a configuration transmitted via heartbeat message to stable storage.
*/
void _heartbeatReconfigStore(const executor::TaskExecutor::CallbackArgs& cbd,
const ReplSetConfig& newConfig);
/**
* Conclusion actions of a heartbeat-triggered reconfiguration.
*/
void _heartbeatReconfigFinish(const executor::TaskExecutor::CallbackArgs& cbData,
const ReplSetConfig& newConfig,
StatusWith myIndex);
/**
* Utility method that schedules or performs actions specified by a HeartbeatResponseAction
* returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
* value of "responseStatus".
*
* Requires "lock" to own _mutex, and returns the same unique_lock.
*/
stdx::unique_lock _handleHeartbeatResponseAction_inlock(
const HeartbeatResponseAction& action,
const StatusWith& responseStatus,
stdx::unique_lock lock);
/**
* Updates the last committed OpTime to be "committedOpTime" if it is more recent than the
* current last committed OpTime.
*/
void _advanceCommitPoint_inlock(const OpTime& committedOpTime);
/**
* Scan the memberData and determine the highest last applied or last
* durable optime present on a majority of servers; set _lastCommittedOpTime to this
* new entry. Wake any threads waiting for replication that now have their
* write concern satisfied.
*
* Whether the last applied or last durable op time is used depends on whether
* the config getWriteConcernMajorityShouldJournal is set.
*/
void _updateLastCommittedOpTime_inlock();
/**
* Callback that attempts to set the current term in topology coordinator and
* relinquishes primary if the term actually changes and we are primary.
* *updateTermResult will be the result of the update term attempt.
* Returns the finish event if it does not finish in this function, for example,
* due to stepdown, otherwise the returned EventHandle is invalid.
*/
EventHandle _updateTerm_inlock(
long long term, TopologyCoordinator::UpdateTermResult* updateTermResult = nullptr);
/**
* Callback that processes the ReplSetMetadata returned from a command run against another
* replica set member and so long as the config version in the metadata matches the replica set
* config version this node currently has, updates the current term.
*
* This does NOT update this node's notion of the commit point.
*
* Returns the finish event which is invalid if the process has already finished.
*/
EventHandle _processReplSetMetadata_inlock(const rpc::ReplSetMetadata& replMetadata);
/**
* Prepares a metadata object for ReplSetMetadata.
*/
void _prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const;
/**
* Prepares a metadata object for OplogQueryMetadata.
*/
void _prepareOplogQueryMetadata_inlock(int rbid, BSONObjBuilder* builder) const;
/**
* Blesses a snapshot to be used for new committed reads.
*/
void _updateCommittedSnapshot_inlock(const OpTime& newCommittedSnapshot);
/**
* A helper method that returns the current stable optime based on the current commit point and
* set of stable optime candidates.
*/
boost::optional _getStableOpTime_inlock();
/**
* Calculates the 'stable' replication optime given a set of optime candidates and the
* current commit point. The stable optime is the greatest optime in 'candidates' that is
* also less than or equal to 'commitPoint'.
*/
boost::optional _calculateStableOpTime(const std::set& candidates,
const OpTime& commitPoint);
/**
* Removes any optimes from the optime set 'candidates' that are less than
* 'stableOpTime'.
*/
void _cleanupStableOpTimeCandidates(std::set* candidates, OpTime stableOpTime);
/**
* Calculates and sets the value of the 'stable' replication optime for the storage engine.
* See ReplicationCoordinatorImpl::_calculateStableOpTime for a definition of 'stable', in
* this context.
*/
void _setStableTimestampForStorage_inlock();
/**
* Drops all snapshots and clears the "committed" snapshot.
*/
void _dropAllSnapshots_inlock();
/**
* Bottom half of _scheduleNextLivenessUpdate.
* Must be called with _mutex held.
*/
void _scheduleNextLivenessUpdate_inlock();
/**
* Callback which marks downed nodes as down, triggers a stepdown if a majority of nodes are no
* longer visible, and reschedules itself.
*/
void _handleLivenessTimeout(const executor::TaskExecutor::CallbackArgs& cbData);
/**
* If "updatedMemberId" is the current _earliestMemberId, cancels the current
* _handleLivenessTimeout callback and calls _scheduleNextLivenessUpdate to schedule a new one.
* Returns immediately otherwise.
*/
void _cancelAndRescheduleLivenessUpdate_inlock(int updatedMemberId);
/**
* Cancels all outstanding _priorityTakeover callbacks.
*/
void _cancelPriorityTakeover_inlock();
/**
* Cancels all outstanding _catchupTakeover callbacks.
*/
void _cancelCatchupTakeover_inlock();
/**
* Cancels the current _handleElectionTimeout callback and reschedules a new callback.
* Returns immediately otherwise.
*/
void _cancelAndRescheduleElectionTimeout_inlock();
/**
* Callback which starts an election if this node is electable and using protocolVersion 1.
*/
void _startElectSelfIfEligibleV1(TopologyCoordinator::StartElectionReason reason);
/**
* Resets the term of last vote to 0 to prevent any node from voting for term 0.
*/
void _resetElectionInfoOnProtocolVersionUpgrade(OperationContext* opCtx,
const ReplSetConfig& oldConfig,
const ReplSetConfig& newConfig);
/**
* Schedules work to be run no sooner than 'when' and returns handle to callback.
* If work cannot be scheduled due to shutdown, returns empty handle.
* All other non-shutdown scheduling failures will abort the process.
* Does not run 'work' if callback is canceled.
*/
CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work);
/**
* Creates an event.
* Returns invalid event handle if the executor is shutting down.
* Otherwise aborts on non-shutdown error.
*/
EventHandle _makeEvent();
/**
* Wrap a function into executor callback.
* If the callback is cancelled, the given function won't run.
*/
executor::TaskExecutor::CallbackFn _wrapAsCallbackFn(const stdx::function& work);
/**
* Finish catch-up mode and start drain mode.
*/
void _enterDrainMode_inlock();
/**
* Waits for the config state to leave kConfigStartingUp, which indicates that start() has
* finished.
*/
void _waitForStartUpComplete();
/**
* Cancels the running election, if any, and returns an event that will be signaled when the
* canceled election completes. If there is no running election, returns an invalid event
* handle.
*/
executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inlock();
/**
* Waits until the optime of the current node is at least the 'opTime'.
*/
Status _waitUntilOpTime(OperationContext* opCtx,
bool isMajorityReadConcern,
OpTime opTime,
boost::optional deadline = boost::none);
/**
* Waits until the optime of the current node is at least the opTime specified in 'readConcern'.
* Supports local and majority readConcern.
*/
// TODO: remove when SERVER-29729 is done
Status _waitUntilOpTimeForReadDeprecated(OperationContext* opCtx,
const ReadConcernArgs& readConcern);
/**
* Waits until the deadline or until the optime of the current node is at least the clusterTime
* specified in 'readConcern'. Supports local and majority readConcern.
* If maxTimeMS and deadline are both specified, it waits for min(maxTimeMS, deadline).
*/
Status _waitUntilClusterTimeForRead(OperationContext* opCtx,
const ReadConcernArgs& readConcern,
boost::optional deadline);
/**
* Returns a pseudorandom number no less than 0 and less than limit (which must be positive).
*/
int64_t _nextRandomInt64_inlock(int64_t limit);
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
//
// (R) Read-only in concurrent operation; no synchronization required.
// (S) Self-synchronizing; access in any way from any context.
// (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing;
// Access in any context.
// (M) Reads and writes guarded by _mutex
// (GM) Readable under any global intent lock. Must hold both the global lock in exclusive
// mode (MODE_X) and hold _mutex to write.
// (I) Independently synchronized, see member variable comment.
// Protects member data of this ReplicationCoordinator.
mutable stdx::mutex _mutex; // (S)
// Handles to actively queued heartbeats.
HeartbeatHandles _heartbeatHandles; // (M)
// When this node does not know itself to be a member of a config, it adds
// every host that sends it a heartbeat request to this set, and also starts
// sending heartbeat requests to that host. This set is cleared whenever
// a node discovers that it is a member of a config.
stdx::unordered_set _seedList; // (M)
// Back pointer to the ServiceContext that has started the instance.
ServiceContext* const _service; // (S)
// Parsed command line arguments related to replication.
const ReplSettings _settings; // (R)
// Mode of replication specified by _settings.
const Mode _replMode; // (R)
// Pointer to the TopologyCoordinator owned by this ReplicationCoordinator.
std::unique_ptr _topCoord; // (M)
// Executor that drives the topology coordinator.
std::unique_ptr _replExecutor; // (S)
// Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator.
std::unique_ptr _externalState; // (PS)
// Our RID, used to identify us to our sync source when sending replication progress
// updates upstream. Set once in startReplication() and then never modified again.
OID _myRID; // (M)
// list of information about clients waiting on replication. Does *not* own the WaiterInfos.
WaiterList _replicationWaiterList; // (M)
// list of information about clients waiting for a particular opTime.
// Does *not* own the WaiterInfos.
WaiterList _opTimeWaiterList; // (M)
// Set to true when we are in the process of shutting down replication.
bool _inShutdown; // (M)
// Election ID of the last election that resulted in this node becoming primary.
OID _electionId; // (M)
// Used to signal threads waiting for changes to _memberState.
stdx::condition_variable _memberStateChange; // (M)
// Current ReplicaSet state.
MemberState _memberState; // (M)
// Used to signal threads waiting for changes to _memberState.
stdx::condition_variable _drainFinishedCond; // (M)
ReplicationCoordinator::ApplierState _applierState = ApplierState::Running; // (M)
// Used to signal threads waiting for changes to _rsConfigState.
stdx::condition_variable _rsConfigStateChange; // (M)
// Represents the configuration state of the coordinator, which controls how and when
// _rsConfig may change. See the state transition diagram in the type definition of
// ConfigState for details.
ConfigState _rsConfigState; // (M)
// The current ReplicaSet configuration object, including the information about tag groups
// that is used to satisfy write concern requests with named gle modes.
ReplSetConfig _rsConfig; // (M)
// This member's index position in the current config.
int _selfIndex; // (M)
// Condition to signal when new heartbeat data comes in.
stdx::condition_variable _stepDownWaiters; // (M)
// State for conducting an election of this node.
// the presence of a non-null _freshnessChecker pointer indicates that an election is
// currently in progress. When using the V1 protocol, a non-null _voteRequester pointer
// indicates this instead.
// Only one election is allowed at a time.
std::unique_ptr _freshnessChecker; // (M)
std::unique_ptr _electCmdRunner; // (M)
std::unique_ptr _voteRequester; // (M)
// Event that the election code will signal when the in-progress election completes.
// Unspecified value when _freshnessChecker is NULL.
executor::TaskExecutor::EventHandle _electionFinishedEvent; // (M)
// Event that the election code will signal when the in-progress election dry run completes,
// which includes writing the last vote and scheduling the real election.
executor::TaskExecutor::EventHandle _electionDryRunFinishedEvent; // (M)
// Whether we slept last time we attempted an election but possibly tied with other nodes.
bool _sleptLastElection; // (M)
// Flag that indicates whether writes to databases other than "local" are allowed. Used to
// answer canAcceptWritesForDatabase() and canAcceptWritesFor() questions.
// Always true for standalone nodes and masters in master-slave relationships.
bool _canAcceptNonLocalWrites; // (GM)
// Flag that indicates whether reads from databases other than "local" are allowed. Unlike
// _canAcceptNonLocalWrites, above, this question is about admission control on secondaries,
// and we do not require that its observers be strongly synchronized. Accidentally
// providing the prior value for a limited period of time is acceptable. Also unlike
// _canAcceptNonLocalWrites, its value is only meaningful on replica set secondaries.
AtomicUInt32 _canServeNonLocalReads; // (S)
// ReplicationProcess used to hold information related to the replication and application of
// operations from the sync source.
ReplicationProcess* const _replicationProcess; // (PS)
// Storage interface used by initial syncer.
StorageInterface* _storage; // (PS)
// InitialSyncer used for initial sync.
std::shared_ptr
_initialSyncer; // (I) pointer set under mutex, copied by callers.
// Hands out the next snapshot name.
AtomicUInt64 _snapshotNameGenerator; // (S)
// The OpTimes and SnapshotNames for all snapshots newer than the current commit point, kept in
// sorted order. Any time this is changed, you must also update _uncommitedSnapshotsSize.
std::deque _uncommittedSnapshots; // (M)
// A cache of the size of _uncommittedSnaphots that can be read without any locking.
// May only be written to while holding _mutex.
AtomicUInt64 _uncommittedSnapshotsSize; // (I)
// The non-null OpTime and SnapshotName of the current snapshot used for committed reads, if
// there is one.
// When engaged, this must be <= _lastCommittedOpTime and < _uncommittedSnapshots.front().
boost::optional _currentCommittedSnapshot; // (M)
// A set of optimes that are used for computing the replication system's current 'stable'
// optime. Every time a node's applied optime is updated, it will be added to this set.
// Optimes that are older than the current stable optime should get removed from this set.
// This set should also be cleared if a rollback occurs.
std::set _stableOpTimeCandidates; // (M)
// Used to signal threads that are waiting for new committed snapshots.
stdx::condition_variable _currentCommittedSnapshotCond; // (M)
// Callback Handle used to cancel a scheduled LivenessTimeout callback.
executor::TaskExecutor::CallbackHandle _handleLivenessTimeoutCbh; // (M)
// Callback Handle used to cancel a scheduled ElectionTimeout callback.
executor::TaskExecutor::CallbackHandle _handleElectionTimeoutCbh; // (M)
// Election timeout callback will not run before this time.
// If this date is Date_t(), the callback is either unscheduled or canceled.
// Used for testing only.
Date_t _handleElectionTimeoutWhen; // (M)
// Callback Handle used to cancel a scheduled PriorityTakeover callback.
executor::TaskExecutor::CallbackHandle _priorityTakeoverCbh; // (M)
// Priority takeover callback will not run before this time.
// If this date is Date_t(), the callback is either unscheduled or canceled.
// Used for testing only.
Date_t _priorityTakeoverWhen; // (M)
// Callback Handle used to cancel a scheduled CatchupTakeover callback.
executor::TaskExecutor::CallbackHandle _catchupTakeoverCbh; // (M)
// Catchup takeover callback will not run before this time.
// If this date is Date_t(), the callback is either unscheduled or canceled.
// Used for testing only.
Date_t _catchupTakeoverWhen; // (M)
// Callback handle used by _waitForStartUpComplete() to block until configuration
// is loaded and external state threads have been started (unless this node is an arbiter).
CallbackHandle _finishLoadLocalConfigCbh; // (M)
// The id of the earliest member, for which the handleLivenessTimeout callback has been
// scheduled. We need this so that we don't needlessly cancel and reschedule the callback on
// every liveness update.
int _earliestMemberId = -1; // (M)
// Cached copy of the current config protocol version.
AtomicInt64 _protVersion{1}; // (S)
// Source of random numbers used in setting election timeouts, etc.
PseudoRandom _random; // (M)
// This setting affects the Applier prefetcher behavior.
mutable stdx::mutex _indexPrefetchMutex;
ReplSettings::IndexPrefetchConfig _indexPrefetchConfig =
ReplSettings::IndexPrefetchConfig::PREFETCH_ALL; // (I)
// The catchup state including all catchup logic. The presence of a non-null pointer indicates
// that the node is currently in catchup mode.
std::unique_ptr _catchupState; // (X)
// Atomic-synchronized copy of Topology Coordinator's _term, for use by the public getTerm()
// function.
// This variable must be written immediately after _term, and thus its value can lag.
// Reading this value does not require the replication coordinator mutex to be locked.
AtomicInt64 _termShadow; // (S)
// When we decide to step down due to hearing about a higher term, we remember the term we heard
// here so we can update our term to match as part of finishing stepdown.
boost::optional _pendingTermUpdateDuringStepDown; // (M)
};
} // namespace repl
} // namespace mongo