diff options
Diffstat (limited to 'src/mongo/client/replica_set_monitor.cpp')
-rw-r--r-- | src/mongo/client/replica_set_monitor.cpp | 1438 |
1 files changed, 7 insertions, 1431 deletions
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index 2110d43cd8a..57f43547cc8 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -41,11 +41,9 @@ #include "mongo/client/connpool.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/read_preference.h" -#include "mongo/client/replica_set_monitor_internal.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/bson_extract_optime.h" #include "mongo/db/server_options.h" -#include "mongo/logv2/log.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -67,371 +65,9 @@ using std::vector; // Failpoint for changing the default refresh period MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod); -namespace { - -// Pull nested types to top-level scope -typedef ReplicaSetMonitor::IsMasterReply IsMasterReply; -typedef ReplicaSetMonitor::ScanState ScanState; -typedef ReplicaSetMonitor::ScanStatePtr ScanStatePtr; -typedef ReplicaSetMonitor::SetState SetState; -typedef ReplicaSetMonitor::SetStatePtr SetStatePtr; -typedef ReplicaSetMonitor::Refresher Refresher; -typedef ScanState::UnconfirmedReplies UnconfirmedReplies; -typedef SetState::Node Node; -typedef SetState::Nodes Nodes; -using executor::TaskExecutor; -using CallbackArgs = TaskExecutor::CallbackArgs; -using CallbackHandle = TaskExecutor::CallbackHandle; - -// Intentionally chosen to compare worse than all known latencies. -const int64_t unknownLatency = numeric_limits<int64_t>::max(); - -const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet()); -const Milliseconds kExpeditedRefreshPeriod(500); -AtomicWord<bool> areRefreshRetriesDisabledForTest{false}; // Only true in tests. - -// -// Helpers for stl algorithms -// - -bool isMaster(const Node& node) { - return node.isMaster; -} - -bool opTimeGreater(const Node* lhs, const Node* rhs) { - return lhs->opTime > rhs->opTime; -} - -bool compareLatencies(const Node* lhs, const Node* rhs) { - // NOTE: this automatically compares Node::unknownLatency worse than all others. - return lhs->latencyMicros < rhs->latencyMicros; -} - -bool hostsEqual(const Node& lhs, const HostAndPort& rhs) { - return lhs.host == rhs; -} - -// Allows comparing two Nodes, or a HostAndPort and a Node. -// NOTE: the two HostAndPort overload is only needed to support extra checks in some STL -// implementations. For simplicity, no comparator should be used with collections of just -// HostAndPort. -struct CompareHosts { - bool operator()(const Node& lhs, const Node& rhs) { - return lhs.host < rhs.host; - } - bool operator()(const Node& lhs, const HostAndPort& rhs) { - return lhs.host < rhs; - } - bool operator()(const HostAndPort& lhs, const Node& rhs) { - return lhs < rhs.host; - } - bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) { - return lhs < rhs; - } -} compareHosts; // like an overloaded function, but able to pass to stl algorithms - -// The following structs should be treated as functions returning a UnaryPredicate. -// Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost)); -// They all hold their constructor argument by reference. - -struct HostIs { - explicit HostIs(const HostAndPort& host) : _host(host) {} - bool operator()(const HostAndPort& host) { - return host == _host; - } - bool operator()(const Node& node) { - return node.host == _host; - } - const HostAndPort& _host; -}; - -struct HostNotIn { - explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {} - bool operator()(const HostAndPort& host) { - return !_hosts.count(host); - } - bool operator()(const Node& node) { - return !_hosts.count(node.host); - } - const std::set<HostAndPort>& _hosts; -}; - -int32_t pingTimeMillis(const Node& node) { - auto latencyMillis = node.latencyMicros / 1000; - if (latencyMillis > numeric_limits<int32_t>::max()) { - // In particular, Node::unknownLatency does not fit in an int32. - return numeric_limits<int32_t>::max(); - } - return latencyMillis; -} - -/** - * Replica set refresh period on the task executor. - */ -const Seconds kDefaultRefreshPeriod(30); -} // namespace - -// If we cannot find a host after 15 seconds of refreshing, give up -const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15); - // Defaults to random selection as required by the spec bool ReplicaSetMonitor::useDeterministicHostSelection = false; -Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() { - Seconds r = kDefaultRefreshPeriod; - static constexpr auto kPeriodField = "period"_sd; - modifyReplicaSetMonitorDefaultRefreshPeriod.executeIf( - [&r](const BSONObj& data) { r = Seconds{data.getIntField(kPeriodField)}; }, - [](const BSONObj& data) { return data.hasField(kPeriodField); }); - return r; -} - -ReplicaSetMonitor::ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {} - -ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri) - : ReplicaSetMonitor( - std::make_shared<SetState>(uri, - &ReplicaSetMonitorManager::get()->getNotifier(), - ReplicaSetMonitorManager::get()->getExecutor())) {} - -void ReplicaSetMonitor::init() { - if (areRefreshRetriesDisabledForTest.load()) { - // This is for MockReplicaSet. Those tests want to control when scanning happens. - LOGV2_WARNING(20180, - "*** Not starting background refresh because refresh retries are disabled."); - return; - } - - { - stdx::lock_guard lk(_state->mutex); - _state->init(); - } -} - -void ReplicaSetMonitor::drop() { - { - stdx::lock_guard lk(_state->mutex); - _state->drop(); - } -} - -ReplicaSetMonitor::~ReplicaSetMonitor() { - drop(); -} - -template <typename Callback> -auto ReplicaSetMonitor::SetState::scheduleWorkAt(Date_t when, Callback&& cb) const { - auto wrappedCallback = [cb = std::forward<Callback>(cb), - anchor = shared_from_this()](const CallbackArgs& cbArgs) mutable { - if (ErrorCodes::isCancelationError(cbArgs.status)) { - // Do no more work if we're removed or canceled - return; - } - invariant(cbArgs.status); - - stdx::lock_guard lk(anchor->mutex); - if (anchor->isDropped) { - return; - } - - cb(cbArgs); - }; - return executor->scheduleWorkAt(std::move(when), std::move(wrappedCallback)); -} - -void ReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy strategy) { - // Reschedule the refresh - - if (!executor || isMocked) { - // Without an executor, we can't do refreshes -- we're in a test - return; - } - - if (isDropped) { // already removed so no need to refresh - LOGV2_DEBUG(20162, - 1, - "Stopping refresh for replica set {name} because it's removed", - "name"_attr = name); - return; - } - - Milliseconds period = refreshPeriod; - if (isExpedited) { - period = std::min<Milliseconds>(period, kExpeditedRefreshPeriod); - } - - auto currentTime = now(); - auto possibleNextScanTime = currentTime + period; - if (refresherHandle && // - (strategy == SchedulingStrategy::kKeepEarlyScan) && // - (nextScanTime > currentTime) && // - (possibleNextScanTime >= nextScanTime)) { - // If the next scan would be sooner than our desired, why cancel? - return; - } - - // Cancel out the last refresh - if (auto currentHandle = std::exchange(refresherHandle, {})) { - executor->cancel(currentHandle); - } - - nextScanTime = possibleNextScanTime; - LOGV2_DEBUG(20163, - 1, - "Next replica set scan scheduled for {nextScanTime}", - "nextScanTime"_attr = nextScanTime); - auto swHandle = scheduleWorkAt(nextScanTime, [this](const CallbackArgs& cbArgs) { - if (cbArgs.myHandle != refresherHandle) - return; // We've been replaced! - - // It is possible that a waiter will have already expired by the point of this rescan. - // Thus we notify here to trigger that logic. - notify(); - - _ensureScanInProgress(shared_from_this()); - - // And now we set up the next one - rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan); - }); - - if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) { - LOGV2_DEBUG(20164, - 1, - "Cant schedule refresh for {name}. Executor shutdown in progress", - "name"_attr = name); - return; - } - - if (!swHandle.isOK()) { - LOGV2_FATAL(20184, - "Can't continue refresh for replica set {name} due to {swHandle_getStatus}", - "name"_attr = name, - "swHandle_getStatus"_attr = redact(swHandle.getStatus())); - fassertFailed(40140); - } - - refresherHandle = std::move(swHandle.getValue()); -} - -SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria, - Milliseconds maxWait) { - return _getHostsOrRefresh(criteria, maxWait) - .then([](const auto& hosts) { - invariant(hosts.size()); - return hosts[0]; - }) - .semi(); -} - -SemiFuture<std::vector<HostAndPort>> ReplicaSetMonitor::getHostsOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { - return _getHostsOrRefresh(criteria, maxWait).semi(); -} - -Future<std::vector<HostAndPort>> ReplicaSetMonitor::_getHostsOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { - - stdx::lock_guard<Latch> lk(_state->mutex); - if (_state->isDropped) { - return Status(ErrorCodes::ReplicaSetMonitorRemoved, - str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"); - } - - auto out = _state->getMatchingHosts(criteria); - if (!out.empty()) - return {std::move(out)}; - - // Fail early without doing any more work if the timeout has already expired. - if (maxWait <= Milliseconds(0)) - return _state->makeUnsatisfedReadPrefError(criteria); - - // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is - // dealing with maxWait. - auto pf = makePromiseFuture<decltype(out)>(); - _state->waiters.emplace_back( - SetState::Waiter{_state->now() + maxWait, criteria, std::move(pf.promise)}); - - // This must go after we set up the wait state to correctly handle unittests using - // MockReplicaSet. - _ensureScanInProgress(_state); - - // Switch to expedited scanning. - _state->isExpedited = true; - _state->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan); - - return std::move(pf.future); -} -HostAndPort ReplicaSetMonitor::getMasterOrUassert() { - return getHostOrRefresh(kPrimaryOnlyReadPreference).get(); -} - -void ReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) { - stdx::lock_guard<Latch> lk(_state->mutex); - Node* node = _state->findNode(host); - if (node) - node->markFailed(status); - if (kDebugBuild) - _state->checkInvariants(); -} - -bool ReplicaSetMonitor::isPrimary(const HostAndPort& host) const { - stdx::lock_guard<Latch> lk(_state->mutex); - Node* node = _state->findNode(host); - return node ? node->isMaster : false; -} - -bool ReplicaSetMonitor::isHostUp(const HostAndPort& host) const { - stdx::lock_guard<Latch> lk(_state->mutex); - Node* node = _state->findNode(host); - return node ? node->isUp : false; -} - -int ReplicaSetMonitor::getMinWireVersion() const { - stdx::lock_guard<Latch> lk(_state->mutex); - int minVersion = 0; - for (const auto& host : _state->nodes) { - if (host.isUp) { - minVersion = std::max(minVersion, host.minWireVersion); - } - } - - return minVersion; -} - -int ReplicaSetMonitor::getMaxWireVersion() const { - stdx::lock_guard<Latch> lk(_state->mutex); - int maxVersion = std::numeric_limits<int>::max(); - for (const auto& host : _state->nodes) { - if (host.isUp) { - maxVersion = std::min(maxVersion, host.maxWireVersion); - } - } - - return maxVersion; -} - -std::string ReplicaSetMonitor::getName() const { - // name is const so don't need to lock - return _state->name; -} - -std::string ReplicaSetMonitor::getServerAddress() const { - stdx::lock_guard<Latch> lk(_state->mutex); - // We return our setUri until first confirmation - return _state->seedConnStr.isValid() ? _state->seedConnStr.toString() - : _state->setUri.connectionString().toString(); -} - -const MongoURI& ReplicaSetMonitor::getOriginalUri() const { - // setUri is const so no need to lock. - return _state->setUri; -} - -bool ReplicaSetMonitor::contains(const HostAndPort& host) const { - stdx::lock_guard<Latch> lk(_state->mutex); - return _state->seedNodes.count(host); -} - shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const string& name, const set<HostAndPort>& servers) { return ReplicaSetMonitorManager::get()->getOrCreateMonitor( @@ -458,40 +94,6 @@ ReplicaSetChangeNotifier& ReplicaSetMonitor::getNotifier() { return ReplicaSetMonitorManager::get()->getNotifier(); } -// TODO move to correct order with non-statics before pushing -void ReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const { - stdx::lock_guard<Latch> lk(_state->mutex); - - BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName())); - if (forFTDC) { - for (size_t i = 0; i < _state->nodes.size(); i++) { - const Node& node = _state->nodes[i]; - monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node)); - } - return; - } - - // NOTE: the format here must be consistent for backwards compatibility - BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts")); - for (size_t i = 0; i < _state->nodes.size(); i++) { - const Node& node = _state->nodes[i]; - - BSONObjBuilder builder; - builder.append("addr", node.host.toString()); - builder.append("ok", node.isUp); - builder.append("ismaster", node.isMaster); // intentionally not camelCase - builder.append("hidden", false); // we don't keep hidden nodes in the set - builder.append("secondary", node.isUp && !node.isMaster); - builder.append("pingTimeMillis", pingTimeMillis(node)); - - if (!node.tags.isEmpty()) { - builder.append("tags", node.tags); - } - - hosts.append(builder.obj()); - } -} - void ReplicaSetMonitor::shutdown() { ReplicaSetMonitorManager::get()->shutdown(); } @@ -500,1041 +102,15 @@ void ReplicaSetMonitor::cleanup() { ReplicaSetMonitorManager::get()->removeAllMonitors(); } -void ReplicaSetMonitor::disableRefreshRetries_forTest() { - areRefreshRetriesDisabledForTest.store(true); -} - -bool ReplicaSetMonitor::isKnownToHaveGoodPrimary() const { - stdx::lock_guard<Latch> lk(_state->mutex); - - for (const auto& node : _state->nodes) { - if (node.isMaster) { - return true; - } - } - - return false; -} - -void ReplicaSetMonitor::runScanForMockReplicaSet() { - stdx::lock_guard<Latch> lk(_state->mutex); - _ensureScanInProgress(_state); - - // This function should only be called from tests using MockReplicaSet and they should use the - // synchronous path to complete before returning. - invariant(_state->currentScan == nullptr); -} - -void ReplicaSetMonitor::_ensureScanInProgress(const SetStatePtr& state) { - Refresher(state).scheduleNetworkRequests(); -} - -Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) { - if (_scan) { - _set->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan); - _scan->retryAllTriedHosts(_set->rand); - return; // participate in in-progress scan - } - - startNewScan(); -} - -void Refresher::scheduleNetworkRequests() { - for (auto ns = getNextStep(); ns.step == NextStep::CONTACT_HOST; ns = getNextStep()) { - if (!_set->executor || _set->isMocked) { - // If we're mocked, just schedule an isMaster - scheduleIsMaster(ns.host); - continue; - } - - // cancel any scheduled isMaster calls that haven't yet been called - Node* node = _set->findOrCreateNode(ns.host); - if (auto handle = std::exchange(node->scheduledIsMasterHandle, {})) { - _set->executor->cancel(handle); - } - - // ensure that the call to isMaster is scheduled at most every 500ms - auto swHandle = - _set->scheduleWorkAt(node->nextPossibleIsMasterCall, - [*this, host = ns.host](const CallbackArgs& cbArgs) mutable { - scheduleIsMaster(host); - }); - - if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) { - _scan->markHostsToScanAsTried(); - break; - } - - if (!swHandle.isOK()) { - LOGV2_FATAL( - 20185, - "Can't continue scan for replica set {set_name} due to {swHandle_getStatus}", - "set_name"_attr = _set->name, - "swHandle_getStatus"_attr = redact(swHandle.getStatus())); - fassertFailed(31176); - } - - node->scheduledIsMasterHandle = uassertStatusOK(std::move(swHandle)); - } - - if (kDebugBuild) - _set->checkInvariants(); -} - -void Refresher::scheduleIsMaster(const HostAndPort& host) { - if (_set->isMocked) { - // MockReplicaSet only works with DBClient-style access since it injects itself into the - // ScopedDbConnection pool connection creation. - try { - ScopedDbConnection conn(ConnectionString(host), kCheckTimeout.count()); - - auto timer = Timer(); - auto reply = BSONObj(); - bool ignoredOutParam = false; - conn->isMaster(ignoredOutParam, &reply); - conn.done(); // return to pool on success. - - receivedIsMaster(host, timer.micros(), reply); - } catch (DBException& ex) { - failedHost(host, ex.toStatus()); - } - - return; - } - - auto request = executor::RemoteCommandRequest( - host, "admin", BSON("isMaster" << 1), nullptr, kCheckTimeout); - request.sslMode = _set->setUri.getSSLMode(); - auto status = - _set->executor - ->scheduleRemoteCommand( - std::move(request), - [copy = *this, host, timer = Timer()]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { - stdx::lock_guard lk(copy._set->mutex); - // Ignore the reply and return if we are no longer the current scan. This might - // happen if it was decided that the host we were contacting isn't part of the - // set. - if (copy._scan != copy._set->currentScan) { - return; - } - - // ensure that isMaster calls occur at most 500ms after the previous call ended - if (auto node = copy._set->findNode(host)) { - node->nextPossibleIsMasterCall = - copy._set->executor->now() + Milliseconds(500); - } - - if (result.response.isOK()) { - // Not using result.response.elapsedMillis because higher precision is - // useful for computing the rolling average. - copy.receivedIsMaster(host, timer.micros(), result.response.data); - } else { - copy.failedHost(host, result.response.status); - } - - // This reply may have discovered new hosts to contact so we need to schedule - // them. - copy.scheduleNetworkRequests(); - }) - .getStatus(); - - if (!status.isOK()) { - failedHost(host, status); - // This is only called from scheduleNetworkRequests() which will still be looping, so we - // don't need to call it here after updating the state. - } -} - -Refresher::NextStep Refresher::getNextStep() { - // No longer the current scan - if (_scan != _set->currentScan) { - return NextStep(NextStep::DONE); - } - - // Wait for all dispatched hosts to return before trying any fallback hosts. - if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) { - return NextStep(NextStep::WAIT); - } - - // If we haven't yet found a master, try contacting unconfirmed hosts - if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) { - _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand); - _scan->possibleNodes.clear(); - } - - if (_scan->hostsToScan.empty()) { - // We've tried all hosts we can, so nothing more to do in this round. - if (!_scan->foundUpMaster) { - LOGV2_WARNING( - 20181, "Unable to reach primary for set {set_name}", "set_name"_attr = _set->name); - - // Since we've talked to everyone we could but still didn't find a primary, we - // do the best we can, and assume all unconfirmedReplies are actually from nodes - // in the set (we've already confirmed that they think they are). This is - // important since it allows us to bootstrap to a usable state even if we are - // unable to talk to a master while starting up. As soon as we are able to - // contact a master, we will remove any nodes that it doesn't think are part of - // the set, undoing the damage we cause here. - - // NOTE: we don't modify seedNodes or notify about set membership change in this - // case since it hasn't been confirmed by a master. - for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); - it != _scan->unconfirmedReplies.end(); - ++it) { - _set->findOrCreateNode(it->first)->update(it->second); - } - - auto connStr = _set->possibleConnectionString(); - if (connStr != _set->workingConnStr) { - _set->workingConnStr = std::move(connStr); - _set->notifier->onPossibleSet(_set->workingConnStr); - } - - // If at some point we care about lacking a primary, on it here - _set->lastSeenMaster = {}; - } - - if (_scan->foundAnyUpNodes) { - _set->consecutiveFailedScans = 0; - } else { - auto nScans = _set->consecutiveFailedScans++; - if (nScans <= 10 || nScans % 10 == 0) { - LOGV2(20165, - "Cannot reach any nodes for set {set_name}. Please check network " - "connectivity and the status of the set. This has happened for " - "{set_consecutiveFailedScans} checks in a row.", - "set_name"_attr = _set->name, - "set_consecutiveFailedScans"_attr = _set->consecutiveFailedScans); - } - } - - // Makes sure all other Refreshers in this round return DONE - _set->currentScan.reset(); - _set->notify(); - - LOGV2_DEBUG(20166, - 1, - "Refreshing replica set {set_name} took {scan_timer_millis}ms", - "set_name"_attr = _set->name, - "scan_timer_millis"_attr = _scan->timer.millis()); - - return NextStep(NextStep::DONE); - } - - // Pop and return the next hostToScan. - HostAndPort host = _scan->hostsToScan.front(); - _scan->hostsToScan.pop_front(); - _scan->waitingFor.insert(host); - _scan->triedHosts.insert(host); - - return NextStep(NextStep::CONTACT_HOST, host); -} - -void Refresher::receivedIsMaster(const HostAndPort& from, - int64_t latencyMicros, - const BSONObj& replyObj) { - _scan->waitingFor.erase(from); - - const IsMasterReply reply(from, latencyMicros, replyObj); - - // Handle various failure cases - if (!reply.ok) { - failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"}); - return; - } - - if (reply.setName != _set->name) { - if (reply.raw["isreplicaset"].trueValue()) { - // The reply came from a node in the state referred to as RSGhost in the SDAM - // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event, - // if a reply from a ghost offers a list of possible other members of the replica set, - // and if this refresher has yet to find the replica set master, we add hosts listed in - // the reply to the list of possible replica set members. - if (!_scan->foundUpMaster) { - _scan->possibleNodes.insert(reply.members.begin(), reply.members.end()); - } - } else { - LOGV2_ERROR(20183, - "replset name mismatch: expected \"{set_name}\", but remote node {from} " - "has replset name \"{reply_setName}\", ismaster: {replyObj}", - "set_name"_attr = _set->name, - "from"_attr = from, - "reply_setName"_attr = reply.setName, - "replyObj"_attr = replyObj); - } - - failedHost(from, - {ErrorCodes::InconsistentReplicaSetNames, - str::stream() << "Target replica set name " << reply.setName - << " does not match the monitored set name " << _set->name}); - return; - } - - if (reply.isMaster) { - Status status = receivedIsMasterFromMaster(from, reply); - if (!status.isOK()) { - failedHost(from, status); - return; - } - } - - if (_scan->foundUpMaster) { - // We only update a Node if a master has confirmed it is in the set. - _set->updateNodeIfInNodes(reply); - } else { - // Populate possibleNodes. - _scan->possibleNodes.insert(reply.members.begin(), reply.members.end()); - _scan->unconfirmedReplies[from] = reply; - } - - // _set->nodes may still not have any nodes with isUp==true, but we have at least found a - // connectible host that is that claims to be in the set. - _scan->foundAnyUpNodes = true; - - _set->notify(); - - if (kDebugBuild) - _set->checkInvariants(); -} - -void Refresher::failedHost(const HostAndPort& host, const Status& status) { - _scan->waitingFor.erase(host); - - Node* node = _set->findNode(host); - if (node) - node->markFailed(status); - - if (_scan->waitingFor.empty()) { - // If this was the last host that needed a response, we should notify the SetState so that - // we can fail any waiters that have timed out. - _set->notify(); - } -} - -void Refresher::startNewScan() { - // The heuristics we use in deciding the order to contact hosts are designed to find a - // master as quickly as possible. This is because we can't use any hosts we find until - // we either get the latest set of members from a master or talk to all possible hosts - // without finding a master. - - // TODO It might make sense to check down nodes first if the last seen master is still - // marked as up. - - _scan = std::make_shared<ScanState>(); - _set->currentScan = _scan; - - int upNodes = 0; - for (Nodes::const_iterator it(_set->nodes.begin()), end(_set->nodes.end()); it != end; ++it) { - if (it->isUp) { - // _scan the nodes we think are up first - _scan->hostsToScan.push_front(it->host); - upNodes++; - } else { - _scan->hostsToScan.push_back(it->host); - } - } - - // shuffle the queue, but keep "up" nodes at the front - std::shuffle( - _scan->hostsToScan.begin(), _scan->hostsToScan.begin() + upNodes, _set->rand.urbg()); - std::shuffle(_scan->hostsToScan.begin() + upNodes, _scan->hostsToScan.end(), _set->rand.urbg()); - - if (!_set->lastSeenMaster.empty()) { - // move lastSeenMaster to front of queue - std::stable_partition( - _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(_set->lastSeenMaster)); - } -} - -Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) { - invariant(reply.isMaster); - - // Reject if config version is older. This is for backwards compatibility with nodes in pv0 - // since they don't have the same ordering with pv1 electionId. - if (reply.configVersion < _set->configVersion) { - return { - ErrorCodes::NotMaster, - str::stream() << "Node " << from << " believes it is primary, but its config version " - << reply.configVersion << " is older than the most recent config version " - << _set->configVersion}; - } - - if (reply.electionId.isSet()) { - // ElectionIds are only comparable if they are of the same protocol version. However, since - // isMaster has no protocol version field, we use the configVersion instead. This works - // because configVersion needs to be incremented whenever the protocol version is changed. - if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() && - _set->maxElectionId.compare(reply.electionId) > 0) { - return { - ErrorCodes::NotMaster, - str::stream() << "Node " << from << " believes it is primary, but its election id " - << reply.electionId << " is older than the most recent election id " - << _set->maxElectionId}; - } - - _set->maxElectionId = reply.electionId; - } - - _set->configVersion = reply.configVersion; - - // Mark all nodes as not master. We will mark ourself as master before releasing the lock. - // NOTE: we use a "last-wins" policy if multiple hosts claim to be master. - for (size_t i = 0; i < _set->nodes.size(); i++) { - _set->nodes[i].isMaster = false; - } - - // Check if the master agrees with our current list of nodes. - // REMINDER: both _set->nodes and reply.members are sorted. - if (_set->nodes.size() != reply.members.size() || - !std::equal(_set->nodes.begin(), _set->nodes.end(), reply.members.begin(), hostsEqual)) { - LOGV2_DEBUG(20167, - 2, - "Adjusting nodes in our view of replica set {set_name} based on master reply: " - "{reply_raw}", - "set_name"_attr = _set->name, - "reply_raw"_attr = redact(reply.raw)); - - // remove non-members from _set->nodes - _set->nodes.erase( - std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.members)), - _set->nodes.end()); - - // add new members to _set->nodes - for (auto& host : reply.members) { - _set->findOrCreateNode(host); - } - - // replace hostToScan queue with untried normal hosts. can both add and remove - // hosts from the queue. - _scan->hostsToScan.clear(); - _scan->enqueAllUntriedHosts(reply.members, _set->rand); - - if (!_scan->waitingFor.empty()) { - // make sure we don't wait for any hosts that aren't considered members - std::set<HostAndPort> newWaitingFor; - std::set_intersection(reply.members.begin(), - reply.members.end(), - _scan->waitingFor.begin(), - _scan->waitingFor.end(), - std::inserter(newWaitingFor, newWaitingFor.end())); - _scan->waitingFor.swap(newWaitingFor); - } - } - - bool changedHosts = reply.members != _set->seedNodes; - bool changedPrimary = reply.host != _set->lastSeenMaster; - if (changedHosts || changedPrimary) { - ++_set->seedGen; - _set->seedNodes = reply.members; - _set->seedConnStr = _set->confirmedConnectionString(); - - // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare - // and we want to record our changes - LOGV2(20168, - "Confirmed replica set for {set_name} is {set_seedConnStr}", - "set_name"_attr = _set->name, - "set_seedConnStr"_attr = _set->seedConnStr); - - _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host, reply.passives); - } - - // Update our working string - _set->workingConnStr = _set->seedConnStr; - - // Update other nodes's information based on replies we've already seen - for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); - it != _scan->unconfirmedReplies.end(); - ++it) { - // this ignores replies from hosts not in _set->nodes (as modified above) - _set->updateNodeIfInNodes(it->second); - } - _scan->unconfirmedReplies.clear(); - - _scan->foundUpMaster = true; - _set->lastSeenMaster = reply.host; - - return Status::OK(); -} - -void IsMasterReply::parse(const BSONObj& obj) { - try { - raw = obj.getOwned(); // don't use obj again after this line - - ok = raw["ok"].trueValue(); - if (!ok) - return; - - setName = raw["setName"].str(); - hidden = raw["hidden"].trueValue(); - secondary = raw["secondary"].trueValue(); - - minWireVersion = raw["minWireVersion"].numberInt(); - maxWireVersion = raw["maxWireVersion"].numberInt(); - - // hidden nodes can't be master, even if they claim to be. - isMaster = !hidden && raw["ismaster"].trueValue(); - - if (isMaster && raw.hasField("electionId")) { - electionId = raw["electionId"].OID(); - } - - configVersion = raw["setVersion"].numberInt(); - - const string primaryString = raw["primary"].str(); - primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString); - - // both hosts and passives, but not arbiters, are considered "normal hosts" - members.clear(); - BSONForEach(host, raw.getObjectField("hosts")) { - members.insert(HostAndPort(host.String())); - } - BSONForEach(host, raw.getObjectField("passives")) { - members.insert(HostAndPort(host.String())); - passives.insert(HostAndPort(host.String())); - } - - tags = raw.getObjectField("tags"); - BSONObj lastWriteField = raw.getObjectField("lastWrite"); - if (!lastWriteField.isEmpty()) { - if (auto lastWrite = lastWriteField["lastWriteDate"]) { - lastWriteDate = lastWrite.date(); - } - - uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime)); - } - } catch (const std::exception& e) { - ok = false; - LOGV2(20169, - "exception while parsing isMaster reply: {e_what} {obj}", - "e_what"_attr = e.what(), - "obj"_attr = obj); - } -} - -Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {} - -void Node::markFailed(const Status& status) { - if (isUp) { - LOGV2(20170, - "Marking host {host} as failed{causedBy_status}", - "host"_attr = host, - "causedBy_status"_attr = causedBy(redact(status))); - - isUp = false; - } - - isMaster = false; -} - -bool Node::matches(const ReadPreference pref) const { - if (!isUp) { - LOGV2_DEBUG(20171, 3, "Host {host} is not up", "host"_attr = host); - return false; - } - - LOGV2_DEBUG(20172, - 3, - "Host {host} is {isMaster_primary_not_primary}", - "host"_attr = host, - "isMaster_primary_not_primary"_attr = (isMaster ? "primary" : "not primary")); - if (pref == ReadPreference::PrimaryOnly) { - return isMaster; - } - - if (pref == ReadPreference::SecondaryOnly) { - return !isMaster; - } - - return true; -} - -bool Node::matches(const BSONObj& tag) const { - BSONForEach(tagCriteria, tag) { - if (SimpleBSONElementComparator::kInstance.evaluate( - this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) { - return false; - } - } - - return true; -} - -void Node::update(const IsMasterReply& reply) { - invariant(host == reply.host); - invariant(reply.ok); - - LOGV2_DEBUG(20173, - 3, - "Updating host {host} based on ismaster reply: {reply_raw}", - "host"_attr = host, - "reply_raw"_attr = reply.raw); - - // Nodes that are hidden or neither master or secondary are considered down since we can't - // send any operations to them. - isUp = !reply.hidden && (reply.isMaster || reply.secondary); - isMaster = reply.isMaster; - - minWireVersion = reply.minWireVersion; - maxWireVersion = reply.maxWireVersion; - - // save a copy if unchanged - if (!tags.binaryEqual(reply.tags)) - tags = reply.tags.getOwned(); - - if (reply.latencyMicros >= 0) { // TODO upper bound? - if (latencyMicros == unknownLatency) { - latencyMicros = reply.latencyMicros; - } else { - // update latency with smoothed moving average (1/4th the delta) - latencyMicros += (reply.latencyMicros - latencyMicros) / 4; - } - } - - LOGV2_DEBUG(20174, - 3, - "Updating {host} lastWriteDate to {reply_lastWriteDate}", - "host"_attr = host, - "reply_lastWriteDate"_attr = reply.lastWriteDate); - lastWriteDate = reply.lastWriteDate; - - LOGV2_DEBUG(20175, - 3, - "Updating {host} opTime to {reply_opTime}", - "host"_attr = host, - "reply_opTime"_attr = reply.opTime); - opTime = reply.opTime; - lastWriteDateUpdateTime = Date_t::now(); -} - -SetState::SetState(const MongoURI& uri, - ReplicaSetChangeNotifier* notifier, - executor::TaskExecutor* executor) - : setUri(std::move(uri)), - name(setUri.getSetName()), - notifier(notifier), - executor(executor), - seedNodes(setUri.getServers().begin(), setUri.getServers().end()), - latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)), - rand(std::random_device()()), - refreshPeriod(getDefaultRefreshPeriod()) { - uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty()); - - if (name.empty()) - LOGV2_WARNING(20182, - "Replica set name empty, first node: {seedNodes_begin}", - "seedNodes_begin"_attr = *(seedNodes.begin())); - - // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a - // scan until we start a scan and either find a master or contact all hosts without finding - // one. - // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to - // sort nodes after this loop. - for (auto&& addr : seedNodes) { - nodes.push_back(Node(addr)); - - if (addr.host()[0] == '$') { - invariant(isMocked || &addr == &*seedNodes.begin()); // Can't mix and match. - isMocked = true; - } else { - invariant(!isMocked); // Can't mix and match. - } - } - - if (kDebugBuild) - checkInvariants(); -} - -HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const { - auto hosts = getMatchingHosts(criteria); - - if (hosts.empty()) { - return HostAndPort(); - } - - return hosts[0]; -} - -std::vector<HostAndPort> SetState::getMatchingHosts(const ReadPreferenceSetting& criteria) const { - switch (criteria.pref) { - // "Prefered" read preferences are defined in terms of other preferences - case ReadPreference::PrimaryPreferred: { - std::vector<HostAndPort> out = - getMatchingHosts(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); - // NOTE: the spec says we should use the primary even if tags don't match - if (!out.empty()) - return out; - return getMatchingHosts(ReadPreferenceSetting( - ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds)); - } - - case ReadPreference::SecondaryPreferred: { - std::vector<HostAndPort> out = getMatchingHosts(ReadPreferenceSetting( - ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds)); - if (!out.empty()) - return out; - // NOTE: the spec says we should use the primary even if tags don't match - return getMatchingHosts( - ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); - } - - case ReadPreference::PrimaryOnly: { - // NOTE: isMaster implies isUp - Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster); - if (it == nodes.end()) - return {}; - return {it->host}; - } - - // The difference between these is handled by Node::matches - case ReadPreference::SecondaryOnly: - case ReadPreference::Nearest: { - std::function<bool(const Node&)> matchNode = [](const Node& node) -> bool { - return true; - }; - // build comparator - if (criteria.maxStalenessSeconds.count()) { - auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster); - if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) { - auto writeDateCmp = [](const Node* a, const Node* b) -> bool { - return a->lastWriteDate < b->lastWriteDate; - }; - // use only non failed nodes - std::vector<const Node*> upNodes; - for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) { - if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) { - upNodes.push_back(&(*nodeIt)); - } - } - auto latestSecNode = - std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp); - if (latestSecNode == upNodes.end()) { - matchNode = [](const Node& node) -> bool { return false; }; - } else { - Date_t maxWriteTime = (*latestSecNode)->lastWriteDate; - matchNode = [=](const Node& node) -> bool { - return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) + - refreshPeriod <= - criteria.maxStalenessSeconds; - }; - } - } else { - Seconds primaryStaleness = duration_cast<Seconds>( - masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate); - matchNode = [=](const Node& node) -> bool { - return duration_cast<Seconds>(node.lastWriteDateUpdateTime - - node.lastWriteDate) - - primaryStaleness + refreshPeriod <= - criteria.maxStalenessSeconds; - }; - } - } - - std::vector<const Node*> allMatchingNodes; - BSONForEach(tagElem, criteria.tags.getTagBSON()) { - uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj()); - BSONObj tag = tagElem.Obj(); - - std::vector<const Node*> matchingNodes; - for (size_t i = 0; i < nodes.size(); i++) { - if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) && - matchNode(nodes[i])) { - matchingNodes.push_back(&nodes[i]); - } - } - - // Only consider nodes that satisfy the minOpTime - if (!criteria.minOpTime.isNull()) { - std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater); - for (size_t i = 0; i < matchingNodes.size(); i++) { - if (matchingNodes[i]->opTime < criteria.minOpTime) { - if (i == 0) { - // If no nodes satisfy the minOpTime criteria, we ignore the - // minOpTime requirement. - break; - } - matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end()); - break; - } - } - } - - allMatchingNodes.insert( - allMatchingNodes.end(), matchingNodes.begin(), matchingNodes.end()); - } - - // don't do more complicated selection if not needed - if (allMatchingNodes.empty()) { - return {}; - } - if (allMatchingNodes.size() == 1) { - return {allMatchingNodes.front()->host}; - } - - // If there are multiple nodes satisfying the minOpTime, next order by latency - // and don't consider hosts further than a threshold from the closest. - std::sort(allMatchingNodes.begin(), allMatchingNodes.end(), compareLatencies); - for (size_t i = 1; i < allMatchingNodes.size(); i++) { - int64_t distance = - allMatchingNodes[i]->latencyMicros - allMatchingNodes[0]->latencyMicros; - if (distance >= latencyThresholdMicros) { - // this node and all remaining ones are too far away - allMatchingNodes.erase(allMatchingNodes.begin() + i, allMatchingNodes.end()); - break; - } - } - - std::vector<HostAndPort> hosts; - std::transform(allMatchingNodes.begin(), - allMatchingNodes.end(), - std::back_inserter(hosts), - [](const auto& node) { return node->host; }); - - // Note that the host list is only deterministic (or random) for the first node. - // The rest of the list is in matchingNodes order (latency) with one element swapped - // for the first element. - if (auto bestHostIdx = ReplicaSetMonitor::useDeterministicHostSelection - ? roundRobin++ % hosts.size() - : rand.nextInt32(hosts.size())) { - using std::swap; - swap(hosts[0], hosts[bestHostIdx]); - } - - return hosts; - } - - default: - uassert(16337, "Unknown read preference", false); - break; - } -} - -Node* SetState::findNode(const HostAndPort& host) { - const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); - if (it == nodes.end() || it->host != host) - return nullptr; - - return &(*it); -} - -Node* SetState::findOrCreateNode(const HostAndPort& host) { - // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class - // must function correctly even with more nodes). If we lift that restriction, we may need - // to consider alternate algorithms. - Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); - if (it == nodes.end() || it->host != host) { - LOGV2_DEBUG(20176, - 2, - "Adding node {host} to our view of replica set {name}", - "host"_attr = host, - "name"_attr = name); - it = nodes.insert(it, Node(host)); - } - return &(*it); -} - -void SetState::updateNodeIfInNodes(const IsMasterReply& reply) { - Node* node = findNode(reply.host); - if (!node) { - LOGV2_DEBUG(20177, - 2, - "Skipping application of ismaster reply from {reply_host} since it isn't a " - "confirmed member of set {name}", - "reply_host"_attr = reply.host, - "name"_attr = name); - return; - } - - node->update(reply); -} - -ConnectionString SetState::confirmedConnectionString() const { - std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes)); - - return ConnectionString::forReplicaSet(name, std::move(hosts)); -} - -ConnectionString SetState::possibleConnectionString() const { - std::vector<HostAndPort> hosts; - hosts.reserve(nodes.size()); - - for (auto& node : nodes) { - hosts.push_back(node.host); - } - - return ConnectionString::forReplicaSet(name, std::move(hosts)); -} - -void SetState::notify() { - if (!waiters.size()) { - return; - } - - const auto cachedNow = now(); - auto shouldQuickFail = areRefreshRetriesDisabledForTest.load() && !currentScan; - - for (auto it = waiters.begin(); it != waiters.end();) { - if (isDropped) { - it->promise.setError({ErrorCodes::ShutdownInProgress, - str::stream() << "ReplicaSetMonitor is shutting down"}); - waiters.erase(it++); - continue; - } - - auto match = getMatchingHosts(it->criteria); - if (!match.empty()) { - // match; - it->promise.emplaceValue(std::move(match)); - waiters.erase(it++); - } else if (it->deadline <= cachedNow) { - LOGV2_DEBUG( - 20178, - 1, - "Unable to statisfy read preference {it_criteria} by deadline {it_deadline}", - "it_criteria"_attr = it->criteria, - "it_deadline"_attr = it->deadline); - it->promise.setError(makeUnsatisfedReadPrefError(it->criteria)); - waiters.erase(it++); - } else if (shouldQuickFail) { - LOGV2_DEBUG(20179, 1, "Unable to statisfy read preference because tests fail quickly"); - it->promise.setError(makeUnsatisfedReadPrefError(it->criteria)); - waiters.erase(it++); - } else { - it++; - } - } - - if (waiters.empty()) { - // No current waiters so we can stop the expedited scanning. - isExpedited = false; - rescheduleRefresh(SchedulingStrategy::kCancelPreviousScan); - } -} - -Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const { - return Status(ErrorCodes::FailedToSatisfyReadPreference, - str::stream() << "Could not find host matching read preference " - << criteria.toString() << " for set " << name); -} - -void SetState::init() { - rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan); - notifier->onFoundSet(name); -} - -void SetState::drop() { - if (std::exchange(isDropped, true)) { - // If a SetState calls drop() from destruction after the RSMM calls shutdown(), then the - // RSMM's executor may no longer exist. Thus, only drop once. - return; - } - - currentScan.reset(); - notify(); - - if (auto handle = std::exchange(refresherHandle, {})) { - // Cancel our refresh on the way out - executor->cancel(handle); - } - - for (auto& node : nodes) { - if (auto handle = std::exchange(node.scheduledIsMasterHandle, {})) { - // Cancel any isMasters we had scheduled - executor->cancel(handle); - } - } - - // No point in notifying if we never started - if (workingConnStr.isValid()) { - notifier->onDroppedSet(name); - } -} - -void SetState::checkInvariants() const { - bool foundMaster = false; - for (size_t i = 0; i < nodes.size(); i++) { - // no empty hosts - invariant(!nodes[i].host.empty()); - - if (nodes[i].isMaster) { - // masters must be up - invariant(nodes[i].isUp); - - // at most one master - invariant(!foundMaster); - foundMaster = true; - - // if we have a master it should be the same as lastSeenMaster - invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster); - } - - // should never end up with negative latencies - invariant(nodes[i].latencyMicros >= 0); - - // nodes must be sorted by host with no-dupes - invariant(i == 0 || (nodes[i - 1].host < nodes[i].host)); - } - - // nodes should be a (non-strict) superset of the seedNodes - invariant(std::includes( - nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts)); - - if (currentScan) { - // hostsToScan can't have dups or hosts already in triedHosts. - std::set<HostAndPort> cantSee = currentScan->triedHosts; - for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin(); - it != currentScan->hostsToScan.end(); - ++it) { - invariant(!cantSee.count(*it)); - cantSee.insert(*it); // make sure we don't see this again - } - - // We should only be waitingFor hosts that are in triedHosts - invariant(std::includes(currentScan->triedHosts.begin(), - currentScan->triedHosts.end(), - currentScan->waitingFor.begin(), - currentScan->waitingFor.end())); - - // We should only have unconfirmedReplies if we haven't found a master yet - invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty()); - } -} - -template <typename Container> -void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) { - invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue. - - // no std::copy_if before c++11 - for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end; - ++it) { - if (!triedHosts.count(*it)) { - hostsToScan.push_back(*it); - } - } - std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg()); -} +namespace { +AtomicWord<bool> refreshRetriesDisabledForTest{false}; +} // namespace -void ScanState::retryAllTriedHosts(PseudoRandom& rand) { - invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue. - // Move hosts that are in triedHosts but not in waitingFor from triedHosts to hostsToScan. - std::set_difference(triedHosts.begin(), - triedHosts.end(), - waitingFor.begin(), - waitingFor.end(), - std::inserter(hostsToScan, hostsToScan.end())); - std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg()); - triedHosts = waitingFor; +void ReplicaSetMonitor::disableRefreshRetries_forTest() { + refreshRetriesDisabledForTest.store(true); } -void ScanState::markHostsToScanAsTried() noexcept { - while (!hostsToScan.empty()) { - auto host = hostsToScan.front(); - hostsToScan.pop_front(); - /** - * Mark the popped host as tried to avoid deleting hosts in multiple points. - * This emulates the final effect of Refresher::getNextStep() on the set. - */ - triedHosts.insert(host); - } +bool ReplicaSetMonitor::areRefreshRetriesDisabledForTest() { + return refreshRetriesDisabledForTest.load(); } } // namespace mongo |