/* Copyright 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include
#include
#include
#include
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/client/mongo_uri.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
namespace mongo {
class BSONObj;
class ReplicaSetMonitor;
struct ReadPreferenceSetting;
typedef std::shared_ptr ReplicaSetMonitorPtr;
/**
* Holds state about a replica set and provides a means to refresh the local view.
* All methods perform the required synchronization to allow callers from multiple threads.
*/
class ReplicaSetMonitor : public std::enable_shared_from_this {
MONGO_DISALLOW_COPYING(ReplicaSetMonitor);
public:
class Refresher;
typedef stdx::function
ConfigChangeHook;
/**
* Initializes local state.
*
* seeds must not be empty.
*/
ReplicaSetMonitor(StringData name, const std::set& seeds);
ReplicaSetMonitor(const MongoURI& uri);
/**
* Schedules the initial refresh task into task executor.
*/
void init();
/**
* Returns a host matching the given read preference or an error, if no host matches.
*
* @param readPref Read preference to match against
* @param maxWait If no host is readily available, which matches the specified read preference,
* wait for one to become available for up to the specified time and periodically refresh
* the view of the set. The call may return with an error earlier than the specified value,
* if none of the known hosts for the set are reachable within some number of attempts.
* Note that if a maxWait of 0ms is specified, this method may still attempt to contact
* every host in the replica set up to one time.
*
* Known errors are:
* FailedToSatisfyReadPreference, if node cannot be found, which matches the read preference.
*/
StatusWith getHostOrRefresh(const ReadPreferenceSetting& readPref,
Milliseconds maxWait = kDefaultFindHostTimeout);
/**
* Returns the host we think is the current master or uasserts.
*
* This is a thin wrapper around getHostOrRefresh so this will also refresh our view if we
* don't think there is a master at first. The main difference is that this will uassert
* rather than returning an empty HostAndPort.
*/
HostAndPort getMasterOrUassert();
/**
* Returns a refresher object that can be used to update our view of the set.
* If a refresh is currently in-progress, the returned Refresher will participate in the
* current refresh round.
*/
Refresher startOrContinueRefresh();
/**
* Notifies this Monitor that a host has failed because of the specified error 'status' and
* should be considered down.
*
* Call this when you get a connection error. If you get an error while trying to refresh our
* view of a host, call Refresher::failedHost instead because it bypasses taking the monitor's
* mutex.
*/
void failedHost(const HostAndPort& host, const Status& status);
/**
* Returns true if this node is the master based ONLY on local data. Be careful, return may
* be stale.
*/
bool isPrimary(const HostAndPort& host) const;
/**
* Returns true if host is part of this set and is considered up (meaning it can accept
* queries).
*/
bool isHostUp(const HostAndPort& host) const;
/**
* Returns the minimum wire version supported across the replica set.
*/
int getMinWireVersion() const;
/**
* Returns the maximum wire version supported across the replica set.
*/
int getMaxWireVersion() const;
/**
* The name of the set.
*/
std::string getName() const;
/**
* Returns a std::string with the format name/server1,server2.
* If name is empty, returns just comma-separated list of servers.
*/
std::string getServerAddress() const;
/**
* Is server part of this set? Uses only cached information.
*/
bool contains(const HostAndPort& server) const;
/**
* Writes information about our cached view of the set to a BSONObjBuilder.
*/
void appendInfo(BSONObjBuilder& b) const;
/**
* Returns true if the monitor knows a usable primary from it's interal view.
*/
bool isKnownToHaveGoodPrimary() const;
/**
* Marks the instance as removed to exit refresh sooner.
*/
void markAsRemoved();
/**
* Creates a new ReplicaSetMonitor, if it doesn't already exist.
*/
static std::shared_ptr createIfNeeded(const std::string& name,
const std::set& servers);
static std::shared_ptr createIfNeeded(const MongoURI& uri);
/**
* gets a cached Monitor per name. If the monitor is not found and createFromSeed is false,
* it will return none. If createFromSeed is true, it will try to look up the last known
* servers list for this set and will create a new monitor using that as the seed list.
*/
static std::shared_ptr get(const std::string& name);
/**
* Removes the ReplicaSetMonitor for the given set name from _sets, which will delete it.
* If clearSeedCache is true, then the cached seed std::string for this Replica Set will be
* removed from _seedServers.
*/
static void remove(const std::string& name);
/**
* Sets the hook to be called whenever the config of any replica set changes.
* Currently only 1 globally, so this asserts if one already exists.
*
* The hook will be called from a fresh thread. It is responsible for initializing any
* thread-local state and ensuring that no exceptions escape.
*
* The hook must not be changed while the program has multiple threads.
*/
static void setAsynchronousConfigChangeHook(ConfigChangeHook hook);
/**
* Sets the hook to be called whenever the config of any replica set changes.
* Currently only 1 globally, so this asserts if one already exists.
*
* The hook will be called inline while refreshing the ReplicaSetMonitor's view of the set
* membership. It is important that the hook not block for long as it will be running under
* the ReplicaSetMonitor's mutex.
*
* The hook must not be changed while the program has multiple threads.
*/
static void setSynchronousConfigChangeHook(ConfigChangeHook hook);
/**
* Permanently stops all monitoring on replica sets and clears all cached information
* as well. As a consequence, NEVER call this if you have other threads that have a
* DBClientReplicaSet instance. This method should be used for unit test only.
*/
static void cleanup();
/**
* Use these to speed up tests by disabling the sleep-and-retry loops and cause errors to be
* reported immediately.
*/
static void disableRefreshRetries_forTest();
/**
* Permanently stops all monitoring on replica sets.
*/
static void shutdown();
/**
* Returns the refresh period that is given to all new SetStates.
*/
static Seconds getDefaultRefreshPeriod();
//
// internal types (defined in replica_set_monitor_internal.h)
//
struct IsMasterReply;
struct ScanState;
struct SetState;
typedef std::shared_ptr ScanStatePtr;
typedef std::shared_ptr SetStatePtr;
/**
* Allows tests to set initial conditions and introspect the current state.
*/
explicit ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {}
~ReplicaSetMonitor();
/**
* The default timeout, which will be used for finding a replica set host if the caller does
* not explicitly specify it.
*/
static const Seconds kDefaultFindHostTimeout;
/**
* Defaults to false, meaning that if multiple hosts meet a criteria we pick one at random.
* This is required by the replica set driver spec. Set this to true in tests that need host
* selection to be deterministic.
*
* NOTE: Used by unit-tests only.
*/
static bool useDeterministicHostSelection;
private:
/**
* Schedules a refresh via the task executor. (Task is automatically canceled in the d-tor.)
*/
void _scheduleRefresh(Date_t when);
/**
* This function refreshes the replica set and calls _scheduleRefresh() again.
*/
void _doScheduledRefresh(const executor::TaskExecutor::CallbackHandle& currentHandle);
// Serializes refresh and protects _refresherHandle
stdx::mutex _mutex;
executor::TaskExecutor::CallbackHandle _refresherHandle;
const SetStatePtr _state;
executor::TaskExecutor* _executor;
AtomicBool _isRemovedFromManager{false};
};
/**
* Refreshes the local view of a replica set.
*
* Use ReplicaSetMonitor::startOrContinueRefresh() to obtain a Refresher.
*
* Multiple threads can refresh a single set without any additional synchronization, however
* they must each use their own Refresher object.
*
* All logic related to choosing the hosts to contact and updating the SetState based on replies
* lives in this class.
*/
class ReplicaSetMonitor::Refresher {
public:
/**
* Contact hosts in the set to refresh our view, but stop once a host matches criteria.
* Returns the matching host or empty if none match after a refresh.
*
* This is called by ReplicaSetMonitor::getHostWithRefresh()
*/
HostAndPort refreshUntilMatches(const ReadPreferenceSetting& criteria);
/**
* Refresh all hosts. Equivalent to refreshUntilMatches with a criteria that never
* matches.
*
* This is intended to be called periodically, possibly from a background thread.
*/
void refreshAll();
//
// Remaining methods are only for testing and internal use.
// Callers are responsible for holding SetState::mutex before calling any of these methods.
//
/**
* Any passed-in pointers are shared with caller.
*
* If no scan is in-progress, this function is responsible for setting up a new scan.
*/
explicit Refresher(const SetStatePtr& setState);
struct NextStep {
enum StepKind {
CONTACT_HOST, /// Contact the returned host
WAIT, /// Wait on condition variable and try again.
DONE, /// No more hosts to contact in this Refresh round
};
explicit NextStep(StepKind step, const HostAndPort& host = HostAndPort())
: step(step), host(host) {}
StepKind step;
HostAndPort host;
};
/**
* Returns the next step to take.
*
* By calling this, you promise to call receivedIsMaster or failedHost if the NextStep is
* CONTACT_HOST.
*/
NextStep getNextStep();
/**
* Call this if a host returned from getNextStep successfully replied to an isMaster call.
* Negative latencyMicros are ignored.
*/
void receivedIsMaster(const HostAndPort& from, int64_t latencyMicros, const BSONObj& reply);
/**
* Call this if a host returned from getNextStep failed to reply to an isMaster call.
*/
void failedHost(const HostAndPort& host, const Status& status);
/**
* Starts a new scan over the hosts in set.
*/
static ScanStatePtr startNewScan(const SetState* set);
private:
/**
* First, checks that the "reply" is not from a stale primary by comparing the electionId of
* "reply" to the maxElectionId recorded by the SetState and returns OK status if "reply"
* belongs to a non-stale primary. Otherwise returns a failed status.
*
* The 'from' parameter specifies the node from which the response is received.
*
* Updates _set and _scan based on set-membership information from a master.
* Applies _scan->unconfirmedReplies to confirmed nodes.
* Does not update this host's node in _set->nodes.
*/
Status receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply);
/**
* Adjusts the _scan work queue based on information from this host.
* This should only be called with replies from non-masters.
* Does not update _set at all.
*/
void receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply);
/**
* Shared implementation of refreshUntilMatches and refreshAll.
* NULL criteria means refresh every host.
* Handles own locking.
*/
HostAndPort _refreshUntilMatches(const ReadPreferenceSetting* criteria);
// Both pointers are never NULL
SetStatePtr _set;
ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started.
};
} // namespace mongo