diff options
Diffstat (limited to 'src/mongo/client/streamable_replica_set_monitor.cpp')
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.cpp | 252 |
1 files changed, 135 insertions, 117 deletions
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 76cbfbedba6..a41e1ffa73e 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -118,6 +118,43 @@ double pingTimeMillis(const ServerDescriptionPtr& serverDescription) { } constexpr auto kZeroMs = Milliseconds(0); + +Status makeUnsatisfiedReadPrefError(const std::string& name, + const ReadPreferenceSetting& criteria) { + return Status(ErrorCodes::FailedToSatisfyReadPreference, + str::stream() << "Could not find host matching read preference " + << criteria.toString() << " for set " << name); +} + +Status makeReplicaSetMonitorRemovedError(const std::string& name) { + return Status(ErrorCodes::ReplicaSetMonitorRemoved, + str::stream() << "ReplicaSetMonitor for set " << name << " is removed"); +} + +bool hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription, + sdam::TopologyDescriptionPtr newDescription) { + if (oldDescription->getServers().size() != newDescription->getServers().size()) + return true; + + for (const auto& server : oldDescription->getServers()) { + const auto newServer = newDescription->findServerByAddress(server->getAddress()); + if (!newServer) + return true; + const ServerDescription& s = *server; + const ServerDescription& ns = **newServer; + if (s != ns) + return true; + } + + for (const auto& server : newDescription->getServers()) { + auto oldServer = oldDescription->findServerByAddress(server->getAddress()); + if (!oldServer) + return true; + } + + return false; +} + } // namespace @@ -243,8 +280,9 @@ void StreamableReplicaSetMonitor::drop() { } SemiFuture<HostAndPort> StreamableReplicaSetMonitor::getHostOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { - return getHostsOrRefresh(criteria, maxWait) + const ReadPreferenceSetting& criteria, const CancelationToken& cancelToken) { + + return getHostsOrRefresh(criteria, cancelToken) .thenRunOn(_executor) .then([self = shared_from_this()](const std::vector<HostAndPort>& result) { invariant(result.size()); @@ -263,14 +301,15 @@ std::vector<HostAndPort> StreamableReplicaSetMonitor::_extractHosts( } SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { + const ReadPreferenceSetting& criteria, const CancelationToken& cancelToken) { // In the fast case (stable topology), we avoid mutex acquisition. if (_isDropped.load()) { - return _makeReplicaSetMonitorRemovedError(); + return makeReplicaSetMonitorRemovedError(getName()); } // start counting from the beginning of the operation - const auto deadline = _executor->now() + ((maxWait > kZeroMs) ? maxWait : kZeroMs); + const auto deadline = _executor->now() + + duration_cast<Milliseconds>(ReplicaSetMonitorInterface::kDefaultFindHostTimeout); // try to satisfy query immediately auto immediateResult = _getHosts(criteria); @@ -286,21 +325,21 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefr "replicaSet"_attr = getName(), "readPref"_attr = readPrefToStringFull(criteria)); - // fail fast on timeout + // Fail fast on timeout or cancelation. const Date_t& now = _executor->now(); - if (deadline <= now) { - return _makeUnsatisfiedReadPrefError(criteria); + if (deadline <= now || cancelToken.isCanceled()) { + return makeUnsatisfiedReadPrefError(getName(), criteria); } return _topologyManager->executeWithLock( - [this, criteria, deadline](const TopologyDescriptionPtr& topologyDescription) + [this, criteria, cancelToken, deadline](const TopologyDescriptionPtr& topologyDescription) -> SemiFuture<std::vector<HostAndPort>> { stdx::lock_guard lk(_mutex); // We check if we are closed under the mutex here since someone could have called // close() concurrently with the code above. if (_isDropped.load()) { - return _makeReplicaSetMonitorRemovedError(); + return makeReplicaSetMonitorRemovedError(getName()); } // try to satisfy the query again while holding both the StreamableRSM mutex and // TopologyManager mutex to avoid missing any topology change that has occurred @@ -310,64 +349,65 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefr return {*immediateResult}; } - return _enqueueOutstandingQuery(lk, criteria, deadline); + return _enqueueOutstandingQuery(lk, criteria, cancelToken, deadline); }); } SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutstandingQuery( - WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline) { - using HostAndPortList = std::vector<HostAndPort>; - Future<HostAndPortList> result; + WithLock, + const ReadPreferenceSetting& criteria, + const CancelationToken& cancelToken, + const Date_t& deadline) { auto query = std::make_shared<HostQuery>(); query->criteria = criteria; - query->deadline = deadline; - auto pf = makePromiseFuture<HostAndPortList>(); + auto pf = makePromiseFuture<std::vector<HostAndPort>>(); query->promise = std::move(pf.promise); - auto deadlineCb = - [this, query, self = shared_from_this()](const TaskExecutor::CallbackArgs& cbArgs) { - stdx::lock_guard lock(_mutex); - if (query->done) { - return; - } + // Make the deadline task cancelable for when the query is satisfied or when the input + // cancelToken is canceled. + query->deadlineCancelSource = CancelationSource(cancelToken); + query->start = _executor->now(); - const auto cbStatus = cbArgs.status; - if (!cbStatus.isOK()) { - query->promise.setError(cbStatus); - query->done = true; - return; - } - - const auto errorStatus = _makeUnsatisfiedReadPrefError(query->criteria); - query->promise.setError(errorStatus); - query->done = true; - LOGV2_INFO(4333208, - "RSM {replicaSet} host selection timeout: {error}", - "RSM host selection timeout", - "replicaSet"_attr = getName(), - "error"_attr = errorStatus.toString()); - }; - auto swDeadlineHandle = _executor->scheduleWorkAt(query->deadline, deadlineCb); - - if (!swDeadlineHandle.isOK()) { - LOGV2_INFO(4333207, - "RSM {replicaSet} error scheduling deadline handler: {error}", - "RSM error scheduling deadline handler", - "replicaSet"_attr = getName(), - "error"_attr = swDeadlineHandle.getStatus()); - return SemiFuture<HostAndPortList>::makeReady(swDeadlineHandle.getStatus()); - } - query->deadlineHandle = swDeadlineHandle.getValue(); - _outstandingQueries.push_back(query); + // Add the query to the list of outstanding queries. + auto queryIter = _outstandingQueries.insert(_outstandingQueries.end(), query); // Send topology changes to the query processor to satisfy the future. // It will be removed as a listener when all waiting queries have been satisfied. _eventsPublisher->registerListener(_queryProcessor); + // After a deadline or when the input cancelation token is canceled, cancel this query. If the + // query completes first, the deadlineCancelSource will be used to cancel this task. + _executor->sleepUntil(deadline, query->deadlineCancelSource.token()) + .getAsync([this, query, queryIter, self = shared_from_this(), cancelToken](Status status) { + // If the deadline was reached or cancelation occurred on the input cancelation token, + // mark the query as canceled. Otherwise, the deadlineCancelSource must have been + // canceled due to the query completing successfully. + if (status.isOK() || cancelToken.isCanceled()) { + auto errorStatus = makeUnsatisfiedReadPrefError(self->getName(), query->criteria); + // Mark query as done, and if it wasn't already done, remove it from the list of + // outstanding queries. + if (query->tryCancel(errorStatus)) { + LOGV2_INFO(4333208, + "RSM {replicaSet} host selection timeout: {error}", + "RSM host selection timeout", + "replicaSet"_attr = self->getName(), + "error"_attr = errorStatus.toString()); + + stdx::lock_guard lk(_mutex); + // Check that the RSM hasn't been dropped (and _outstandingQueries has not + // been cleared) before erasing. + if (!_isDropped.load()) { + invariant(_outstandingQueries.size() > 0); + _eraseQueryFromOutstandingQueries(lk, queryIter); + } + } + } + }); + return std::move(pf.future).semi(); -} // namespace mongo +} boost::optional<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_getHosts( const TopologyDescriptionPtr& topology, const ReadPreferenceSetting& criteria) { @@ -383,7 +423,7 @@ boost::optional<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_getHosts } HostAndPort StreamableReplicaSetMonitor::getPrimaryOrUassert() { - return getHostOrRefresh(kPrimaryOnlyReadPreference).get(); + return getHostOrRefresh(kPrimaryOnlyReadPreference, CancelationToken::uncancelable()).get(); } sdam::TopologyEventsPublisherPtr StreamableReplicaSetMonitor::getEventsPublisher() { @@ -599,7 +639,7 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent( return; // Notify external components if there are membership changes in the topology. - if (_hasMembershipChange(previousDescription, newDescription)) { + if (hasMembershipChange(previousDescription, newDescription)) { LOGV2(4333213, "RSM {replicaSet} Topology Change: {newTopologyDescription}", "RSM Topology Change", @@ -693,39 +733,22 @@ std::string StreamableReplicaSetMonitor::_logPrefix() { void StreamableReplicaSetMonitor::_failOutstandingWithStatus(WithLock, Status status) { for (const auto& query : _outstandingQueries) { - if (query->done) - continue; - - query->done = true; - _executor->cancel(query->deadlineHandle); - query->promise.setError(status); + (void)query->tryCancel(status); } _outstandingQueries.clear(); } -bool StreamableReplicaSetMonitor::_hasMembershipChange( - sdam::TopologyDescriptionPtr oldDescription, sdam::TopologyDescriptionPtr newDescription) { - - if (oldDescription->getServers().size() != newDescription->getServers().size()) - return true; - - for (const auto& server : oldDescription->getServers()) { - const auto newServer = newDescription->findServerByAddress(server->getAddress()); - if (!newServer) - return true; - const ServerDescription& s = *server; - const ServerDescription& ns = **newServer; - if (s != ns) - return true; - } +std::list<StreamableReplicaSetMonitor::HostQueryPtr>::iterator +StreamableReplicaSetMonitor::_eraseQueryFromOutstandingQueries( + WithLock, std::list<HostQueryPtr>::iterator iter) { - for (const auto& server : newDescription->getServers()) { - auto oldServer = oldDescription->findServerByAddress(server->getAddress()); - if (!oldServer) - return true; + auto retVal = _outstandingQueries.erase(iter); + if (_outstandingQueries.size() == 0) { + // If there are no more outstanding queries, no need to listen for topology + // changes in this monitor. + _eventsPublisher->removeListener(_queryProcessor); } - - return false; + return retVal; } void StreamableReplicaSetMonitor::_processOutstanding( @@ -737,57 +760,52 @@ void StreamableReplicaSetMonitor::_processOutstanding( stdx::lock_guard lock(_mutex); - bool shouldRemove; auto it = _outstandingQueries.begin(); + bool hadUnresolvedQuery{false}; + + // Iterate through the outstanding queries and try to resolve them via calls to _getHosts. If we + // succeed in resolving a query, the query is removed from the list. If a query has already been + // canceled, or there are no results, it will be skipped. Cancelation logic elsewhere will + // handle removing the canceled queries from the list. while (it != _outstandingQueries.end()) { auto& query = *it; - shouldRemove = false; - if (query->done) { - shouldRemove = true; - } else { + // If query has not been canceled yet, try to satisfy it. + if (!query->hasBeenResolved()) { auto result = _getHosts(topologyDescription, query->criteria); if (result) { - _executor->cancel(query->deadlineHandle); - query->done = true; - query->promise.emplaceValue(std::move(*result)); - const auto latency = _executor->now() - query->start; - LOGV2_DEBUG(433214, - kLowerLogLevel, - "RSM {replicaSet} finished async getHosts: {readPref} ({duration})", - "RSM finished async getHosts", - "replicaSet"_attr = getName(), - "readPref"_attr = readPrefToStringFull(query->criteria), - "duration"_attr = Milliseconds(latency)); - shouldRemove = true; + if (query->tryResolveWithSuccess(std::move(*result))) { + const auto latency = _executor->now() - query->start; + LOGV2_DEBUG(433214, + kLowerLogLevel, + "RSM {replicaSet} finished async getHosts: {readPref} ({duration})", + "RSM finished async getHosts", + "replicaSet"_attr = getName(), + "readPref"_attr = readPrefToStringFull(query->criteria), + "duration"_attr = Milliseconds(latency)); + + it = _eraseQueryFromOutstandingQueries(lock, it); + } else { + // The query was canceled, so skip to the next entry without erasing it. + ++it; + } + } else { + // Results were not available, so skip to the next entry without erasing it. + ++it; + hadUnresolvedQuery = true; } + } else { + // The query was canceled, so skip to the next entry without erasing it. + ++it; } - - it = (shouldRemove) ? _outstandingQueries.erase(it) : ++it; } - if (_outstandingQueries.size()) { - // enable expedited mode + // If there remain unresolved queries, enable expedited mode. + if (hadUnresolvedQuery) { _serverDiscoveryMonitor->requestImmediateCheck(); - } else { - // if no more outstanding queries, no need to listen for topology changes in - // this monitor. - _eventsPublisher->removeListener(_queryProcessor); } } -Status StreamableReplicaSetMonitor::_makeUnsatisfiedReadPrefError( - const ReadPreferenceSetting& criteria) const { - return Status(ErrorCodes::FailedToSatisfyReadPreference, - str::stream() << "Could not find host matching read preference " - << criteria.toString() << " for set " << getName()); -} - -Status StreamableReplicaSetMonitor::_makeReplicaSetMonitorRemovedError() const { - return Status(ErrorCodes::ReplicaSetMonitorRemoved, - str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"); -} - void StreamableReplicaSetMonitor::runScanForMockReplicaSet() { MONGO_UNREACHABLE; } |