summaryrefslogtreecommitdiff
path: root/src/mongo/client/replica_set_monitor.cpp
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2020-02-12 18:10:07 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-24 17:28:39 +0000
commit5c833792316fe542445683b865a66a64eb793a4a (patch)
tree7fabcf6e73d58641cfa51e6fd07e613426e80811 /src/mongo/client/replica_set_monitor.cpp
parentd000968754302bd8ebbeaee3f7fbed6b25778593 (diff)
downloadmongo-5c833792316fe542445683b865a66a64eb793a4a.tar.gz
SERVER-45962 feature flag to opt out of new RSM implementation
Diffstat (limited to 'src/mongo/client/replica_set_monitor.cpp')
-rw-r--r--src/mongo/client/replica_set_monitor.cpp1438
1 files changed, 7 insertions, 1431 deletions
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index 2110d43cd8a..57f43547cc8 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -41,11 +41,9 @@
#include "mongo/client/connpool.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/client/read_preference.h"
-#include "mongo/client/replica_set_monitor_internal.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/bson_extract_optime.h"
#include "mongo/db/server_options.h"
-#include "mongo/logv2/log.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -67,371 +65,9 @@ using std::vector;
// Failpoint for changing the default refresh period
MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod);
-namespace {
-
-// Pull nested types to top-level scope
-typedef ReplicaSetMonitor::IsMasterReply IsMasterReply;
-typedef ReplicaSetMonitor::ScanState ScanState;
-typedef ReplicaSetMonitor::ScanStatePtr ScanStatePtr;
-typedef ReplicaSetMonitor::SetState SetState;
-typedef ReplicaSetMonitor::SetStatePtr SetStatePtr;
-typedef ReplicaSetMonitor::Refresher Refresher;
-typedef ScanState::UnconfirmedReplies UnconfirmedReplies;
-typedef SetState::Node Node;
-typedef SetState::Nodes Nodes;
-using executor::TaskExecutor;
-using CallbackArgs = TaskExecutor::CallbackArgs;
-using CallbackHandle = TaskExecutor::CallbackHandle;
-
-// Intentionally chosen to compare worse than all known latencies.
-const int64_t unknownLatency = numeric_limits<int64_t>::max();
-
-const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet());
-const Milliseconds kExpeditedRefreshPeriod(500);
-AtomicWord<bool> areRefreshRetriesDisabledForTest{false}; // Only true in tests.
-
-//
-// Helpers for stl algorithms
-//
-
-bool isMaster(const Node& node) {
- return node.isMaster;
-}
-
-bool opTimeGreater(const Node* lhs, const Node* rhs) {
- return lhs->opTime > rhs->opTime;
-}
-
-bool compareLatencies(const Node* lhs, const Node* rhs) {
- // NOTE: this automatically compares Node::unknownLatency worse than all others.
- return lhs->latencyMicros < rhs->latencyMicros;
-}
-
-bool hostsEqual(const Node& lhs, const HostAndPort& rhs) {
- return lhs.host == rhs;
-}
-
-// Allows comparing two Nodes, or a HostAndPort and a Node.
-// NOTE: the two HostAndPort overload is only needed to support extra checks in some STL
-// implementations. For simplicity, no comparator should be used with collections of just
-// HostAndPort.
-struct CompareHosts {
- bool operator()(const Node& lhs, const Node& rhs) {
- return lhs.host < rhs.host;
- }
- bool operator()(const Node& lhs, const HostAndPort& rhs) {
- return lhs.host < rhs;
- }
- bool operator()(const HostAndPort& lhs, const Node& rhs) {
- return lhs < rhs.host;
- }
- bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) {
- return lhs < rhs;
- }
-} compareHosts; // like an overloaded function, but able to pass to stl algorithms
-
-// The following structs should be treated as functions returning a UnaryPredicate.
-// Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost));
-// They all hold their constructor argument by reference.
-
-struct HostIs {
- explicit HostIs(const HostAndPort& host) : _host(host) {}
- bool operator()(const HostAndPort& host) {
- return host == _host;
- }
- bool operator()(const Node& node) {
- return node.host == _host;
- }
- const HostAndPort& _host;
-};
-
-struct HostNotIn {
- explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {}
- bool operator()(const HostAndPort& host) {
- return !_hosts.count(host);
- }
- bool operator()(const Node& node) {
- return !_hosts.count(node.host);
- }
- const std::set<HostAndPort>& _hosts;
-};
-
-int32_t pingTimeMillis(const Node& node) {
- auto latencyMillis = node.latencyMicros / 1000;
- if (latencyMillis > numeric_limits<int32_t>::max()) {
- // In particular, Node::unknownLatency does not fit in an int32.
- return numeric_limits<int32_t>::max();
- }
- return latencyMillis;
-}
-
-/**
- * Replica set refresh period on the task executor.
- */
-const Seconds kDefaultRefreshPeriod(30);
-} // namespace
-
-// If we cannot find a host after 15 seconds of refreshing, give up
-const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15);
-
// Defaults to random selection as required by the spec
bool ReplicaSetMonitor::useDeterministicHostSelection = false;
-Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() {
- Seconds r = kDefaultRefreshPeriod;
- static constexpr auto kPeriodField = "period"_sd;
- modifyReplicaSetMonitorDefaultRefreshPeriod.executeIf(
- [&r](const BSONObj& data) { r = Seconds{data.getIntField(kPeriodField)}; },
- [](const BSONObj& data) { return data.hasField(kPeriodField); });
- return r;
-}
-
-ReplicaSetMonitor::ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {}
-
-ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri)
- : ReplicaSetMonitor(
- std::make_shared<SetState>(uri,
- &ReplicaSetMonitorManager::get()->getNotifier(),
- ReplicaSetMonitorManager::get()->getExecutor())) {}
-
-void ReplicaSetMonitor::init() {
- if (areRefreshRetriesDisabledForTest.load()) {
- // This is for MockReplicaSet. Those tests want to control when scanning happens.
- LOGV2_WARNING(20180,
- "*** Not starting background refresh because refresh retries are disabled.");
- return;
- }
-
- {
- stdx::lock_guard lk(_state->mutex);
- _state->init();
- }
-}
-
-void ReplicaSetMonitor::drop() {
- {
- stdx::lock_guard lk(_state->mutex);
- _state->drop();
- }
-}
-
-ReplicaSetMonitor::~ReplicaSetMonitor() {
- drop();
-}
-
-template <typename Callback>
-auto ReplicaSetMonitor::SetState::scheduleWorkAt(Date_t when, Callback&& cb) const {
- auto wrappedCallback = [cb = std::forward<Callback>(cb),
- anchor = shared_from_this()](const CallbackArgs& cbArgs) mutable {
- if (ErrorCodes::isCancelationError(cbArgs.status)) {
- // Do no more work if we're removed or canceled
- return;
- }
- invariant(cbArgs.status);
-
- stdx::lock_guard lk(anchor->mutex);
- if (anchor->isDropped) {
- return;
- }
-
- cb(cbArgs);
- };
- return executor->scheduleWorkAt(std::move(when), std::move(wrappedCallback));
-}
-
-void ReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy strategy) {
- // Reschedule the refresh
-
- if (!executor || isMocked) {
- // Without an executor, we can't do refreshes -- we're in a test
- return;
- }
-
- if (isDropped) { // already removed so no need to refresh
- LOGV2_DEBUG(20162,
- 1,
- "Stopping refresh for replica set {name} because it's removed",
- "name"_attr = name);
- return;
- }
-
- Milliseconds period = refreshPeriod;
- if (isExpedited) {
- period = std::min<Milliseconds>(period, kExpeditedRefreshPeriod);
- }
-
- auto currentTime = now();
- auto possibleNextScanTime = currentTime + period;
- if (refresherHandle && //
- (strategy == SchedulingStrategy::kKeepEarlyScan) && //
- (nextScanTime > currentTime) && //
- (possibleNextScanTime >= nextScanTime)) {
- // If the next scan would be sooner than our desired, why cancel?
- return;
- }
-
- // Cancel out the last refresh
- if (auto currentHandle = std::exchange(refresherHandle, {})) {
- executor->cancel(currentHandle);
- }
-
- nextScanTime = possibleNextScanTime;
- LOGV2_DEBUG(20163,
- 1,
- "Next replica set scan scheduled for {nextScanTime}",
- "nextScanTime"_attr = nextScanTime);
- auto swHandle = scheduleWorkAt(nextScanTime, [this](const CallbackArgs& cbArgs) {
- if (cbArgs.myHandle != refresherHandle)
- return; // We've been replaced!
-
- // It is possible that a waiter will have already expired by the point of this rescan.
- // Thus we notify here to trigger that logic.
- notify();
-
- _ensureScanInProgress(shared_from_this());
-
- // And now we set up the next one
- rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan);
- });
-
- if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) {
- LOGV2_DEBUG(20164,
- 1,
- "Cant schedule refresh for {name}. Executor shutdown in progress",
- "name"_attr = name);
- return;
- }
-
- if (!swHandle.isOK()) {
- LOGV2_FATAL(20184,
- "Can't continue refresh for replica set {name} due to {swHandle_getStatus}",
- "name"_attr = name,
- "swHandle_getStatus"_attr = redact(swHandle.getStatus()));
- fassertFailed(40140);
- }
-
- refresherHandle = std::move(swHandle.getValue());
-}
-
-SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
- Milliseconds maxWait) {
- return _getHostsOrRefresh(criteria, maxWait)
- .then([](const auto& hosts) {
- invariant(hosts.size());
- return hosts[0];
- })
- .semi();
-}
-
-SemiFuture<std::vector<HostAndPort>> ReplicaSetMonitor::getHostsOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
- return _getHostsOrRefresh(criteria, maxWait).semi();
-}
-
-Future<std::vector<HostAndPort>> ReplicaSetMonitor::_getHostsOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
-
- stdx::lock_guard<Latch> lk(_state->mutex);
- if (_state->isDropped) {
- return Status(ErrorCodes::ReplicaSetMonitorRemoved,
- str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed");
- }
-
- auto out = _state->getMatchingHosts(criteria);
- if (!out.empty())
- return {std::move(out)};
-
- // Fail early without doing any more work if the timeout has already expired.
- if (maxWait <= Milliseconds(0))
- return _state->makeUnsatisfedReadPrefError(criteria);
-
- // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is
- // dealing with maxWait.
- auto pf = makePromiseFuture<decltype(out)>();
- _state->waiters.emplace_back(
- SetState::Waiter{_state->now() + maxWait, criteria, std::move(pf.promise)});
-
- // This must go after we set up the wait state to correctly handle unittests using
- // MockReplicaSet.
- _ensureScanInProgress(_state);
-
- // Switch to expedited scanning.
- _state->isExpedited = true;
- _state->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan);
-
- return std::move(pf.future);
-}
-HostAndPort ReplicaSetMonitor::getMasterOrUassert() {
- return getHostOrRefresh(kPrimaryOnlyReadPreference).get();
-}
-
-void ReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
- stdx::lock_guard<Latch> lk(_state->mutex);
- Node* node = _state->findNode(host);
- if (node)
- node->markFailed(status);
- if (kDebugBuild)
- _state->checkInvariants();
-}
-
-bool ReplicaSetMonitor::isPrimary(const HostAndPort& host) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- Node* node = _state->findNode(host);
- return node ? node->isMaster : false;
-}
-
-bool ReplicaSetMonitor::isHostUp(const HostAndPort& host) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- Node* node = _state->findNode(host);
- return node ? node->isUp : false;
-}
-
-int ReplicaSetMonitor::getMinWireVersion() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- int minVersion = 0;
- for (const auto& host : _state->nodes) {
- if (host.isUp) {
- minVersion = std::max(minVersion, host.minWireVersion);
- }
- }
-
- return minVersion;
-}
-
-int ReplicaSetMonitor::getMaxWireVersion() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- int maxVersion = std::numeric_limits<int>::max();
- for (const auto& host : _state->nodes) {
- if (host.isUp) {
- maxVersion = std::min(maxVersion, host.maxWireVersion);
- }
- }
-
- return maxVersion;
-}
-
-std::string ReplicaSetMonitor::getName() const {
- // name is const so don't need to lock
- return _state->name;
-}
-
-std::string ReplicaSetMonitor::getServerAddress() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- // We return our setUri until first confirmation
- return _state->seedConnStr.isValid() ? _state->seedConnStr.toString()
- : _state->setUri.connectionString().toString();
-}
-
-const MongoURI& ReplicaSetMonitor::getOriginalUri() const {
- // setUri is const so no need to lock.
- return _state->setUri;
-}
-
-bool ReplicaSetMonitor::contains(const HostAndPort& host) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- return _state->seedNodes.count(host);
-}
-
shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const string& name,
const set<HostAndPort>& servers) {
return ReplicaSetMonitorManager::get()->getOrCreateMonitor(
@@ -458,40 +94,6 @@ ReplicaSetChangeNotifier& ReplicaSetMonitor::getNotifier() {
return ReplicaSetMonitorManager::get()->getNotifier();
}
-// TODO move to correct order with non-statics before pushing
-void ReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
-
- BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName()));
- if (forFTDC) {
- for (size_t i = 0; i < _state->nodes.size(); i++) {
- const Node& node = _state->nodes[i];
- monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node));
- }
- return;
- }
-
- // NOTE: the format here must be consistent for backwards compatibility
- BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts"));
- for (size_t i = 0; i < _state->nodes.size(); i++) {
- const Node& node = _state->nodes[i];
-
- BSONObjBuilder builder;
- builder.append("addr", node.host.toString());
- builder.append("ok", node.isUp);
- builder.append("ismaster", node.isMaster); // intentionally not camelCase
- builder.append("hidden", false); // we don't keep hidden nodes in the set
- builder.append("secondary", node.isUp && !node.isMaster);
- builder.append("pingTimeMillis", pingTimeMillis(node));
-
- if (!node.tags.isEmpty()) {
- builder.append("tags", node.tags);
- }
-
- hosts.append(builder.obj());
- }
-}
-
void ReplicaSetMonitor::shutdown() {
ReplicaSetMonitorManager::get()->shutdown();
}
@@ -500,1041 +102,15 @@ void ReplicaSetMonitor::cleanup() {
ReplicaSetMonitorManager::get()->removeAllMonitors();
}
-void ReplicaSetMonitor::disableRefreshRetries_forTest() {
- areRefreshRetriesDisabledForTest.store(true);
-}
-
-bool ReplicaSetMonitor::isKnownToHaveGoodPrimary() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
-
- for (const auto& node : _state->nodes) {
- if (node.isMaster) {
- return true;
- }
- }
-
- return false;
-}
-
-void ReplicaSetMonitor::runScanForMockReplicaSet() {
- stdx::lock_guard<Latch> lk(_state->mutex);
- _ensureScanInProgress(_state);
-
- // This function should only be called from tests using MockReplicaSet and they should use the
- // synchronous path to complete before returning.
- invariant(_state->currentScan == nullptr);
-}
-
-void ReplicaSetMonitor::_ensureScanInProgress(const SetStatePtr& state) {
- Refresher(state).scheduleNetworkRequests();
-}
-
-Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) {
- if (_scan) {
- _set->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan);
- _scan->retryAllTriedHosts(_set->rand);
- return; // participate in in-progress scan
- }
-
- startNewScan();
-}
-
-void Refresher::scheduleNetworkRequests() {
- for (auto ns = getNextStep(); ns.step == NextStep::CONTACT_HOST; ns = getNextStep()) {
- if (!_set->executor || _set->isMocked) {
- // If we're mocked, just schedule an isMaster
- scheduleIsMaster(ns.host);
- continue;
- }
-
- // cancel any scheduled isMaster calls that haven't yet been called
- Node* node = _set->findOrCreateNode(ns.host);
- if (auto handle = std::exchange(node->scheduledIsMasterHandle, {})) {
- _set->executor->cancel(handle);
- }
-
- // ensure that the call to isMaster is scheduled at most every 500ms
- auto swHandle =
- _set->scheduleWorkAt(node->nextPossibleIsMasterCall,
- [*this, host = ns.host](const CallbackArgs& cbArgs) mutable {
- scheduleIsMaster(host);
- });
-
- if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) {
- _scan->markHostsToScanAsTried();
- break;
- }
-
- if (!swHandle.isOK()) {
- LOGV2_FATAL(
- 20185,
- "Can't continue scan for replica set {set_name} due to {swHandle_getStatus}",
- "set_name"_attr = _set->name,
- "swHandle_getStatus"_attr = redact(swHandle.getStatus()));
- fassertFailed(31176);
- }
-
- node->scheduledIsMasterHandle = uassertStatusOK(std::move(swHandle));
- }
-
- if (kDebugBuild)
- _set->checkInvariants();
-}
-
-void Refresher::scheduleIsMaster(const HostAndPort& host) {
- if (_set->isMocked) {
- // MockReplicaSet only works with DBClient-style access since it injects itself into the
- // ScopedDbConnection pool connection creation.
- try {
- ScopedDbConnection conn(ConnectionString(host), kCheckTimeout.count());
-
- auto timer = Timer();
- auto reply = BSONObj();
- bool ignoredOutParam = false;
- conn->isMaster(ignoredOutParam, &reply);
- conn.done(); // return to pool on success.
-
- receivedIsMaster(host, timer.micros(), reply);
- } catch (DBException& ex) {
- failedHost(host, ex.toStatus());
- }
-
- return;
- }
-
- auto request = executor::RemoteCommandRequest(
- host, "admin", BSON("isMaster" << 1), nullptr, kCheckTimeout);
- request.sslMode = _set->setUri.getSSLMode();
- auto status =
- _set->executor
- ->scheduleRemoteCommand(
- std::move(request),
- [copy = *this, host, timer = Timer()](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
- stdx::lock_guard lk(copy._set->mutex);
- // Ignore the reply and return if we are no longer the current scan. This might
- // happen if it was decided that the host we were contacting isn't part of the
- // set.
- if (copy._scan != copy._set->currentScan) {
- return;
- }
-
- // ensure that isMaster calls occur at most 500ms after the previous call ended
- if (auto node = copy._set->findNode(host)) {
- node->nextPossibleIsMasterCall =
- copy._set->executor->now() + Milliseconds(500);
- }
-
- if (result.response.isOK()) {
- // Not using result.response.elapsedMillis because higher precision is
- // useful for computing the rolling average.
- copy.receivedIsMaster(host, timer.micros(), result.response.data);
- } else {
- copy.failedHost(host, result.response.status);
- }
-
- // This reply may have discovered new hosts to contact so we need to schedule
- // them.
- copy.scheduleNetworkRequests();
- })
- .getStatus();
-
- if (!status.isOK()) {
- failedHost(host, status);
- // This is only called from scheduleNetworkRequests() which will still be looping, so we
- // don't need to call it here after updating the state.
- }
-}
-
-Refresher::NextStep Refresher::getNextStep() {
- // No longer the current scan
- if (_scan != _set->currentScan) {
- return NextStep(NextStep::DONE);
- }
-
- // Wait for all dispatched hosts to return before trying any fallback hosts.
- if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) {
- return NextStep(NextStep::WAIT);
- }
-
- // If we haven't yet found a master, try contacting unconfirmed hosts
- if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) {
- _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand);
- _scan->possibleNodes.clear();
- }
-
- if (_scan->hostsToScan.empty()) {
- // We've tried all hosts we can, so nothing more to do in this round.
- if (!_scan->foundUpMaster) {
- LOGV2_WARNING(
- 20181, "Unable to reach primary for set {set_name}", "set_name"_attr = _set->name);
-
- // Since we've talked to everyone we could but still didn't find a primary, we
- // do the best we can, and assume all unconfirmedReplies are actually from nodes
- // in the set (we've already confirmed that they think they are). This is
- // important since it allows us to bootstrap to a usable state even if we are
- // unable to talk to a master while starting up. As soon as we are able to
- // contact a master, we will remove any nodes that it doesn't think are part of
- // the set, undoing the damage we cause here.
-
- // NOTE: we don't modify seedNodes or notify about set membership change in this
- // case since it hasn't been confirmed by a master.
- for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
- it != _scan->unconfirmedReplies.end();
- ++it) {
- _set->findOrCreateNode(it->first)->update(it->second);
- }
-
- auto connStr = _set->possibleConnectionString();
- if (connStr != _set->workingConnStr) {
- _set->workingConnStr = std::move(connStr);
- _set->notifier->onPossibleSet(_set->workingConnStr);
- }
-
- // If at some point we care about lacking a primary, on it here
- _set->lastSeenMaster = {};
- }
-
- if (_scan->foundAnyUpNodes) {
- _set->consecutiveFailedScans = 0;
- } else {
- auto nScans = _set->consecutiveFailedScans++;
- if (nScans <= 10 || nScans % 10 == 0) {
- LOGV2(20165,
- "Cannot reach any nodes for set {set_name}. Please check network "
- "connectivity and the status of the set. This has happened for "
- "{set_consecutiveFailedScans} checks in a row.",
- "set_name"_attr = _set->name,
- "set_consecutiveFailedScans"_attr = _set->consecutiveFailedScans);
- }
- }
-
- // Makes sure all other Refreshers in this round return DONE
- _set->currentScan.reset();
- _set->notify();
-
- LOGV2_DEBUG(20166,
- 1,
- "Refreshing replica set {set_name} took {scan_timer_millis}ms",
- "set_name"_attr = _set->name,
- "scan_timer_millis"_attr = _scan->timer.millis());
-
- return NextStep(NextStep::DONE);
- }
-
- // Pop and return the next hostToScan.
- HostAndPort host = _scan->hostsToScan.front();
- _scan->hostsToScan.pop_front();
- _scan->waitingFor.insert(host);
- _scan->triedHosts.insert(host);
-
- return NextStep(NextStep::CONTACT_HOST, host);
-}
-
-void Refresher::receivedIsMaster(const HostAndPort& from,
- int64_t latencyMicros,
- const BSONObj& replyObj) {
- _scan->waitingFor.erase(from);
-
- const IsMasterReply reply(from, latencyMicros, replyObj);
-
- // Handle various failure cases
- if (!reply.ok) {
- failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"});
- return;
- }
-
- if (reply.setName != _set->name) {
- if (reply.raw["isreplicaset"].trueValue()) {
- // The reply came from a node in the state referred to as RSGhost in the SDAM
- // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event,
- // if a reply from a ghost offers a list of possible other members of the replica set,
- // and if this refresher has yet to find the replica set master, we add hosts listed in
- // the reply to the list of possible replica set members.
- if (!_scan->foundUpMaster) {
- _scan->possibleNodes.insert(reply.members.begin(), reply.members.end());
- }
- } else {
- LOGV2_ERROR(20183,
- "replset name mismatch: expected \"{set_name}\", but remote node {from} "
- "has replset name \"{reply_setName}\", ismaster: {replyObj}",
- "set_name"_attr = _set->name,
- "from"_attr = from,
- "reply_setName"_attr = reply.setName,
- "replyObj"_attr = replyObj);
- }
-
- failedHost(from,
- {ErrorCodes::InconsistentReplicaSetNames,
- str::stream() << "Target replica set name " << reply.setName
- << " does not match the monitored set name " << _set->name});
- return;
- }
-
- if (reply.isMaster) {
- Status status = receivedIsMasterFromMaster(from, reply);
- if (!status.isOK()) {
- failedHost(from, status);
- return;
- }
- }
-
- if (_scan->foundUpMaster) {
- // We only update a Node if a master has confirmed it is in the set.
- _set->updateNodeIfInNodes(reply);
- } else {
- // Populate possibleNodes.
- _scan->possibleNodes.insert(reply.members.begin(), reply.members.end());
- _scan->unconfirmedReplies[from] = reply;
- }
-
- // _set->nodes may still not have any nodes with isUp==true, but we have at least found a
- // connectible host that is that claims to be in the set.
- _scan->foundAnyUpNodes = true;
-
- _set->notify();
-
- if (kDebugBuild)
- _set->checkInvariants();
-}
-
-void Refresher::failedHost(const HostAndPort& host, const Status& status) {
- _scan->waitingFor.erase(host);
-
- Node* node = _set->findNode(host);
- if (node)
- node->markFailed(status);
-
- if (_scan->waitingFor.empty()) {
- // If this was the last host that needed a response, we should notify the SetState so that
- // we can fail any waiters that have timed out.
- _set->notify();
- }
-}
-
-void Refresher::startNewScan() {
- // The heuristics we use in deciding the order to contact hosts are designed to find a
- // master as quickly as possible. This is because we can't use any hosts we find until
- // we either get the latest set of members from a master or talk to all possible hosts
- // without finding a master.
-
- // TODO It might make sense to check down nodes first if the last seen master is still
- // marked as up.
-
- _scan = std::make_shared<ScanState>();
- _set->currentScan = _scan;
-
- int upNodes = 0;
- for (Nodes::const_iterator it(_set->nodes.begin()), end(_set->nodes.end()); it != end; ++it) {
- if (it->isUp) {
- // _scan the nodes we think are up first
- _scan->hostsToScan.push_front(it->host);
- upNodes++;
- } else {
- _scan->hostsToScan.push_back(it->host);
- }
- }
-
- // shuffle the queue, but keep "up" nodes at the front
- std::shuffle(
- _scan->hostsToScan.begin(), _scan->hostsToScan.begin() + upNodes, _set->rand.urbg());
- std::shuffle(_scan->hostsToScan.begin() + upNodes, _scan->hostsToScan.end(), _set->rand.urbg());
-
- if (!_set->lastSeenMaster.empty()) {
- // move lastSeenMaster to front of queue
- std::stable_partition(
- _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(_set->lastSeenMaster));
- }
-}
-
-Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) {
- invariant(reply.isMaster);
-
- // Reject if config version is older. This is for backwards compatibility with nodes in pv0
- // since they don't have the same ordering with pv1 electionId.
- if (reply.configVersion < _set->configVersion) {
- return {
- ErrorCodes::NotMaster,
- str::stream() << "Node " << from << " believes it is primary, but its config version "
- << reply.configVersion << " is older than the most recent config version "
- << _set->configVersion};
- }
-
- if (reply.electionId.isSet()) {
- // ElectionIds are only comparable if they are of the same protocol version. However, since
- // isMaster has no protocol version field, we use the configVersion instead. This works
- // because configVersion needs to be incremented whenever the protocol version is changed.
- if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() &&
- _set->maxElectionId.compare(reply.electionId) > 0) {
- return {
- ErrorCodes::NotMaster,
- str::stream() << "Node " << from << " believes it is primary, but its election id "
- << reply.electionId << " is older than the most recent election id "
- << _set->maxElectionId};
- }
-
- _set->maxElectionId = reply.electionId;
- }
-
- _set->configVersion = reply.configVersion;
-
- // Mark all nodes as not master. We will mark ourself as master before releasing the lock.
- // NOTE: we use a "last-wins" policy if multiple hosts claim to be master.
- for (size_t i = 0; i < _set->nodes.size(); i++) {
- _set->nodes[i].isMaster = false;
- }
-
- // Check if the master agrees with our current list of nodes.
- // REMINDER: both _set->nodes and reply.members are sorted.
- if (_set->nodes.size() != reply.members.size() ||
- !std::equal(_set->nodes.begin(), _set->nodes.end(), reply.members.begin(), hostsEqual)) {
- LOGV2_DEBUG(20167,
- 2,
- "Adjusting nodes in our view of replica set {set_name} based on master reply: "
- "{reply_raw}",
- "set_name"_attr = _set->name,
- "reply_raw"_attr = redact(reply.raw));
-
- // remove non-members from _set->nodes
- _set->nodes.erase(
- std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.members)),
- _set->nodes.end());
-
- // add new members to _set->nodes
- for (auto& host : reply.members) {
- _set->findOrCreateNode(host);
- }
-
- // replace hostToScan queue with untried normal hosts. can both add and remove
- // hosts from the queue.
- _scan->hostsToScan.clear();
- _scan->enqueAllUntriedHosts(reply.members, _set->rand);
-
- if (!_scan->waitingFor.empty()) {
- // make sure we don't wait for any hosts that aren't considered members
- std::set<HostAndPort> newWaitingFor;
- std::set_intersection(reply.members.begin(),
- reply.members.end(),
- _scan->waitingFor.begin(),
- _scan->waitingFor.end(),
- std::inserter(newWaitingFor, newWaitingFor.end()));
- _scan->waitingFor.swap(newWaitingFor);
- }
- }
-
- bool changedHosts = reply.members != _set->seedNodes;
- bool changedPrimary = reply.host != _set->lastSeenMaster;
- if (changedHosts || changedPrimary) {
- ++_set->seedGen;
- _set->seedNodes = reply.members;
- _set->seedConnStr = _set->confirmedConnectionString();
-
- // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare
- // and we want to record our changes
- LOGV2(20168,
- "Confirmed replica set for {set_name} is {set_seedConnStr}",
- "set_name"_attr = _set->name,
- "set_seedConnStr"_attr = _set->seedConnStr);
-
- _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host, reply.passives);
- }
-
- // Update our working string
- _set->workingConnStr = _set->seedConnStr;
-
- // Update other nodes's information based on replies we've already seen
- for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
- it != _scan->unconfirmedReplies.end();
- ++it) {
- // this ignores replies from hosts not in _set->nodes (as modified above)
- _set->updateNodeIfInNodes(it->second);
- }
- _scan->unconfirmedReplies.clear();
-
- _scan->foundUpMaster = true;
- _set->lastSeenMaster = reply.host;
-
- return Status::OK();
-}
-
-void IsMasterReply::parse(const BSONObj& obj) {
- try {
- raw = obj.getOwned(); // don't use obj again after this line
-
- ok = raw["ok"].trueValue();
- if (!ok)
- return;
-
- setName = raw["setName"].str();
- hidden = raw["hidden"].trueValue();
- secondary = raw["secondary"].trueValue();
-
- minWireVersion = raw["minWireVersion"].numberInt();
- maxWireVersion = raw["maxWireVersion"].numberInt();
-
- // hidden nodes can't be master, even if they claim to be.
- isMaster = !hidden && raw["ismaster"].trueValue();
-
- if (isMaster && raw.hasField("electionId")) {
- electionId = raw["electionId"].OID();
- }
-
- configVersion = raw["setVersion"].numberInt();
-
- const string primaryString = raw["primary"].str();
- primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString);
-
- // both hosts and passives, but not arbiters, are considered "normal hosts"
- members.clear();
- BSONForEach(host, raw.getObjectField("hosts")) {
- members.insert(HostAndPort(host.String()));
- }
- BSONForEach(host, raw.getObjectField("passives")) {
- members.insert(HostAndPort(host.String()));
- passives.insert(HostAndPort(host.String()));
- }
-
- tags = raw.getObjectField("tags");
- BSONObj lastWriteField = raw.getObjectField("lastWrite");
- if (!lastWriteField.isEmpty()) {
- if (auto lastWrite = lastWriteField["lastWriteDate"]) {
- lastWriteDate = lastWrite.date();
- }
-
- uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime));
- }
- } catch (const std::exception& e) {
- ok = false;
- LOGV2(20169,
- "exception while parsing isMaster reply: {e_what} {obj}",
- "e_what"_attr = e.what(),
- "obj"_attr = obj);
- }
-}
-
-Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {}
-
-void Node::markFailed(const Status& status) {
- if (isUp) {
- LOGV2(20170,
- "Marking host {host} as failed{causedBy_status}",
- "host"_attr = host,
- "causedBy_status"_attr = causedBy(redact(status)));
-
- isUp = false;
- }
-
- isMaster = false;
-}
-
-bool Node::matches(const ReadPreference pref) const {
- if (!isUp) {
- LOGV2_DEBUG(20171, 3, "Host {host} is not up", "host"_attr = host);
- return false;
- }
-
- LOGV2_DEBUG(20172,
- 3,
- "Host {host} is {isMaster_primary_not_primary}",
- "host"_attr = host,
- "isMaster_primary_not_primary"_attr = (isMaster ? "primary" : "not primary"));
- if (pref == ReadPreference::PrimaryOnly) {
- return isMaster;
- }
-
- if (pref == ReadPreference::SecondaryOnly) {
- return !isMaster;
- }
-
- return true;
-}
-
-bool Node::matches(const BSONObj& tag) const {
- BSONForEach(tagCriteria, tag) {
- if (SimpleBSONElementComparator::kInstance.evaluate(
- this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) {
- return false;
- }
- }
-
- return true;
-}
-
-void Node::update(const IsMasterReply& reply) {
- invariant(host == reply.host);
- invariant(reply.ok);
-
- LOGV2_DEBUG(20173,
- 3,
- "Updating host {host} based on ismaster reply: {reply_raw}",
- "host"_attr = host,
- "reply_raw"_attr = reply.raw);
-
- // Nodes that are hidden or neither master or secondary are considered down since we can't
- // send any operations to them.
- isUp = !reply.hidden && (reply.isMaster || reply.secondary);
- isMaster = reply.isMaster;
-
- minWireVersion = reply.minWireVersion;
- maxWireVersion = reply.maxWireVersion;
-
- // save a copy if unchanged
- if (!tags.binaryEqual(reply.tags))
- tags = reply.tags.getOwned();
-
- if (reply.latencyMicros >= 0) { // TODO upper bound?
- if (latencyMicros == unknownLatency) {
- latencyMicros = reply.latencyMicros;
- } else {
- // update latency with smoothed moving average (1/4th the delta)
- latencyMicros += (reply.latencyMicros - latencyMicros) / 4;
- }
- }
-
- LOGV2_DEBUG(20174,
- 3,
- "Updating {host} lastWriteDate to {reply_lastWriteDate}",
- "host"_attr = host,
- "reply_lastWriteDate"_attr = reply.lastWriteDate);
- lastWriteDate = reply.lastWriteDate;
-
- LOGV2_DEBUG(20175,
- 3,
- "Updating {host} opTime to {reply_opTime}",
- "host"_attr = host,
- "reply_opTime"_attr = reply.opTime);
- opTime = reply.opTime;
- lastWriteDateUpdateTime = Date_t::now();
-}
-
-SetState::SetState(const MongoURI& uri,
- ReplicaSetChangeNotifier* notifier,
- executor::TaskExecutor* executor)
- : setUri(std::move(uri)),
- name(setUri.getSetName()),
- notifier(notifier),
- executor(executor),
- seedNodes(setUri.getServers().begin(), setUri.getServers().end()),
- latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)),
- rand(std::random_device()()),
- refreshPeriod(getDefaultRefreshPeriod()) {
- uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty());
-
- if (name.empty())
- LOGV2_WARNING(20182,
- "Replica set name empty, first node: {seedNodes_begin}",
- "seedNodes_begin"_attr = *(seedNodes.begin()));
-
- // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a
- // scan until we start a scan and either find a master or contact all hosts without finding
- // one.
- // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to
- // sort nodes after this loop.
- for (auto&& addr : seedNodes) {
- nodes.push_back(Node(addr));
-
- if (addr.host()[0] == '$') {
- invariant(isMocked || &addr == &*seedNodes.begin()); // Can't mix and match.
- isMocked = true;
- } else {
- invariant(!isMocked); // Can't mix and match.
- }
- }
-
- if (kDebugBuild)
- checkInvariants();
-}
-
-HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const {
- auto hosts = getMatchingHosts(criteria);
-
- if (hosts.empty()) {
- return HostAndPort();
- }
-
- return hosts[0];
-}
-
-std::vector<HostAndPort> SetState::getMatchingHosts(const ReadPreferenceSetting& criteria) const {
- switch (criteria.pref) {
- // "Prefered" read preferences are defined in terms of other preferences
- case ReadPreference::PrimaryPreferred: {
- std::vector<HostAndPort> out =
- getMatchingHosts(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
- // NOTE: the spec says we should use the primary even if tags don't match
- if (!out.empty())
- return out;
- return getMatchingHosts(ReadPreferenceSetting(
- ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
- }
-
- case ReadPreference::SecondaryPreferred: {
- std::vector<HostAndPort> out = getMatchingHosts(ReadPreferenceSetting(
- ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
- if (!out.empty())
- return out;
- // NOTE: the spec says we should use the primary even if tags don't match
- return getMatchingHosts(
- ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
- }
-
- case ReadPreference::PrimaryOnly: {
- // NOTE: isMaster implies isUp
- Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster);
- if (it == nodes.end())
- return {};
- return {it->host};
- }
-
- // The difference between these is handled by Node::matches
- case ReadPreference::SecondaryOnly:
- case ReadPreference::Nearest: {
- std::function<bool(const Node&)> matchNode = [](const Node& node) -> bool {
- return true;
- };
- // build comparator
- if (criteria.maxStalenessSeconds.count()) {
- auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster);
- if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) {
- auto writeDateCmp = [](const Node* a, const Node* b) -> bool {
- return a->lastWriteDate < b->lastWriteDate;
- };
- // use only non failed nodes
- std::vector<const Node*> upNodes;
- for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) {
- if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) {
- upNodes.push_back(&(*nodeIt));
- }
- }
- auto latestSecNode =
- std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp);
- if (latestSecNode == upNodes.end()) {
- matchNode = [](const Node& node) -> bool { return false; };
- } else {
- Date_t maxWriteTime = (*latestSecNode)->lastWriteDate;
- matchNode = [=](const Node& node) -> bool {
- return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) +
- refreshPeriod <=
- criteria.maxStalenessSeconds;
- };
- }
- } else {
- Seconds primaryStaleness = duration_cast<Seconds>(
- masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate);
- matchNode = [=](const Node& node) -> bool {
- return duration_cast<Seconds>(node.lastWriteDateUpdateTime -
- node.lastWriteDate) -
- primaryStaleness + refreshPeriod <=
- criteria.maxStalenessSeconds;
- };
- }
- }
-
- std::vector<const Node*> allMatchingNodes;
- BSONForEach(tagElem, criteria.tags.getTagBSON()) {
- uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj());
- BSONObj tag = tagElem.Obj();
-
- std::vector<const Node*> matchingNodes;
- for (size_t i = 0; i < nodes.size(); i++) {
- if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) &&
- matchNode(nodes[i])) {
- matchingNodes.push_back(&nodes[i]);
- }
- }
-
- // Only consider nodes that satisfy the minOpTime
- if (!criteria.minOpTime.isNull()) {
- std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater);
- for (size_t i = 0; i < matchingNodes.size(); i++) {
- if (matchingNodes[i]->opTime < criteria.minOpTime) {
- if (i == 0) {
- // If no nodes satisfy the minOpTime criteria, we ignore the
- // minOpTime requirement.
- break;
- }
- matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());
- break;
- }
- }
- }
-
- allMatchingNodes.insert(
- allMatchingNodes.end(), matchingNodes.begin(), matchingNodes.end());
- }
-
- // don't do more complicated selection if not needed
- if (allMatchingNodes.empty()) {
- return {};
- }
- if (allMatchingNodes.size() == 1) {
- return {allMatchingNodes.front()->host};
- }
-
- // If there are multiple nodes satisfying the minOpTime, next order by latency
- // and don't consider hosts further than a threshold from the closest.
- std::sort(allMatchingNodes.begin(), allMatchingNodes.end(), compareLatencies);
- for (size_t i = 1; i < allMatchingNodes.size(); i++) {
- int64_t distance =
- allMatchingNodes[i]->latencyMicros - allMatchingNodes[0]->latencyMicros;
- if (distance >= latencyThresholdMicros) {
- // this node and all remaining ones are too far away
- allMatchingNodes.erase(allMatchingNodes.begin() + i, allMatchingNodes.end());
- break;
- }
- }
-
- std::vector<HostAndPort> hosts;
- std::transform(allMatchingNodes.begin(),
- allMatchingNodes.end(),
- std::back_inserter(hosts),
- [](const auto& node) { return node->host; });
-
- // Note that the host list is only deterministic (or random) for the first node.
- // The rest of the list is in matchingNodes order (latency) with one element swapped
- // for the first element.
- if (auto bestHostIdx = ReplicaSetMonitor::useDeterministicHostSelection
- ? roundRobin++ % hosts.size()
- : rand.nextInt32(hosts.size())) {
- using std::swap;
- swap(hosts[0], hosts[bestHostIdx]);
- }
-
- return hosts;
- }
-
- default:
- uassert(16337, "Unknown read preference", false);
- break;
- }
-}
-
-Node* SetState::findNode(const HostAndPort& host) {
- const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
- if (it == nodes.end() || it->host != host)
- return nullptr;
-
- return &(*it);
-}
-
-Node* SetState::findOrCreateNode(const HostAndPort& host) {
- // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class
- // must function correctly even with more nodes). If we lift that restriction, we may need
- // to consider alternate algorithms.
- Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
- if (it == nodes.end() || it->host != host) {
- LOGV2_DEBUG(20176,
- 2,
- "Adding node {host} to our view of replica set {name}",
- "host"_attr = host,
- "name"_attr = name);
- it = nodes.insert(it, Node(host));
- }
- return &(*it);
-}
-
-void SetState::updateNodeIfInNodes(const IsMasterReply& reply) {
- Node* node = findNode(reply.host);
- if (!node) {
- LOGV2_DEBUG(20177,
- 2,
- "Skipping application of ismaster reply from {reply_host} since it isn't a "
- "confirmed member of set {name}",
- "reply_host"_attr = reply.host,
- "name"_attr = name);
- return;
- }
-
- node->update(reply);
-}
-
-ConnectionString SetState::confirmedConnectionString() const {
- std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes));
-
- return ConnectionString::forReplicaSet(name, std::move(hosts));
-}
-
-ConnectionString SetState::possibleConnectionString() const {
- std::vector<HostAndPort> hosts;
- hosts.reserve(nodes.size());
-
- for (auto& node : nodes) {
- hosts.push_back(node.host);
- }
-
- return ConnectionString::forReplicaSet(name, std::move(hosts));
-}
-
-void SetState::notify() {
- if (!waiters.size()) {
- return;
- }
-
- const auto cachedNow = now();
- auto shouldQuickFail = areRefreshRetriesDisabledForTest.load() && !currentScan;
-
- for (auto it = waiters.begin(); it != waiters.end();) {
- if (isDropped) {
- it->promise.setError({ErrorCodes::ShutdownInProgress,
- str::stream() << "ReplicaSetMonitor is shutting down"});
- waiters.erase(it++);
- continue;
- }
-
- auto match = getMatchingHosts(it->criteria);
- if (!match.empty()) {
- // match;
- it->promise.emplaceValue(std::move(match));
- waiters.erase(it++);
- } else if (it->deadline <= cachedNow) {
- LOGV2_DEBUG(
- 20178,
- 1,
- "Unable to statisfy read preference {it_criteria} by deadline {it_deadline}",
- "it_criteria"_attr = it->criteria,
- "it_deadline"_attr = it->deadline);
- it->promise.setError(makeUnsatisfedReadPrefError(it->criteria));
- waiters.erase(it++);
- } else if (shouldQuickFail) {
- LOGV2_DEBUG(20179, 1, "Unable to statisfy read preference because tests fail quickly");
- it->promise.setError(makeUnsatisfedReadPrefError(it->criteria));
- waiters.erase(it++);
- } else {
- it++;
- }
- }
-
- if (waiters.empty()) {
- // No current waiters so we can stop the expedited scanning.
- isExpedited = false;
- rescheduleRefresh(SchedulingStrategy::kCancelPreviousScan);
- }
-}
-
-Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const {
- return Status(ErrorCodes::FailedToSatisfyReadPreference,
- str::stream() << "Could not find host matching read preference "
- << criteria.toString() << " for set " << name);
-}
-
-void SetState::init() {
- rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan);
- notifier->onFoundSet(name);
-}
-
-void SetState::drop() {
- if (std::exchange(isDropped, true)) {
- // If a SetState calls drop() from destruction after the RSMM calls shutdown(), then the
- // RSMM's executor may no longer exist. Thus, only drop once.
- return;
- }
-
- currentScan.reset();
- notify();
-
- if (auto handle = std::exchange(refresherHandle, {})) {
- // Cancel our refresh on the way out
- executor->cancel(handle);
- }
-
- for (auto& node : nodes) {
- if (auto handle = std::exchange(node.scheduledIsMasterHandle, {})) {
- // Cancel any isMasters we had scheduled
- executor->cancel(handle);
- }
- }
-
- // No point in notifying if we never started
- if (workingConnStr.isValid()) {
- notifier->onDroppedSet(name);
- }
-}
-
-void SetState::checkInvariants() const {
- bool foundMaster = false;
- for (size_t i = 0; i < nodes.size(); i++) {
- // no empty hosts
- invariant(!nodes[i].host.empty());
-
- if (nodes[i].isMaster) {
- // masters must be up
- invariant(nodes[i].isUp);
-
- // at most one master
- invariant(!foundMaster);
- foundMaster = true;
-
- // if we have a master it should be the same as lastSeenMaster
- invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster);
- }
-
- // should never end up with negative latencies
- invariant(nodes[i].latencyMicros >= 0);
-
- // nodes must be sorted by host with no-dupes
- invariant(i == 0 || (nodes[i - 1].host < nodes[i].host));
- }
-
- // nodes should be a (non-strict) superset of the seedNodes
- invariant(std::includes(
- nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts));
-
- if (currentScan) {
- // hostsToScan can't have dups or hosts already in triedHosts.
- std::set<HostAndPort> cantSee = currentScan->triedHosts;
- for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin();
- it != currentScan->hostsToScan.end();
- ++it) {
- invariant(!cantSee.count(*it));
- cantSee.insert(*it); // make sure we don't see this again
- }
-
- // We should only be waitingFor hosts that are in triedHosts
- invariant(std::includes(currentScan->triedHosts.begin(),
- currentScan->triedHosts.end(),
- currentScan->waitingFor.begin(),
- currentScan->waitingFor.end()));
-
- // We should only have unconfirmedReplies if we haven't found a master yet
- invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty());
- }
-}
-
-template <typename Container>
-void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) {
- invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue.
-
- // no std::copy_if before c++11
- for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end;
- ++it) {
- if (!triedHosts.count(*it)) {
- hostsToScan.push_back(*it);
- }
- }
- std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg());
-}
+namespace {
+AtomicWord<bool> refreshRetriesDisabledForTest{false};
+} // namespace
-void ScanState::retryAllTriedHosts(PseudoRandom& rand) {
- invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue.
- // Move hosts that are in triedHosts but not in waitingFor from triedHosts to hostsToScan.
- std::set_difference(triedHosts.begin(),
- triedHosts.end(),
- waitingFor.begin(),
- waitingFor.end(),
- std::inserter(hostsToScan, hostsToScan.end()));
- std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg());
- triedHosts = waitingFor;
+void ReplicaSetMonitor::disableRefreshRetries_forTest() {
+ refreshRetriesDisabledForTest.store(true);
}
-void ScanState::markHostsToScanAsTried() noexcept {
- while (!hostsToScan.empty()) {
- auto host = hostsToScan.front();
- hostsToScan.pop_front();
- /**
- * Mark the popped host as tried to avoid deleting hosts in multiple points.
- * This emulates the final effect of Refresher::getNextStep() on the set.
- */
- triedHosts.insert(host);
- }
+bool ReplicaSetMonitor::areRefreshRetriesDisabledForTest() {
+ return refreshRetriesDisabledForTest.load();
}
} // namespace mongo