From 322d6dc73aac61c675d00ee0e45bedecbc286f4d Mon Sep 17 00:00:00 2001 From: Matthew Saltz Date: Tue, 24 Nov 2020 22:38:35 +0000 Subject: SERVER-50657 Add CancelationToken support to the ReplicaSetMonitor --- src/mongo/client/dbclient_rs.cpp | 8 +- src/mongo/client/remote_command_targeter_rs.cpp | 13 +- .../replica_set_monitor_integration_test.cpp | 3 +- src/mongo/client/replica_set_monitor_interface.h | 7 +- src/mongo/client/scanning_replica_set_monitor.cpp | 11 +- src/mongo/client/scanning_replica_set_monitor.h | 18 +- .../client/streamable_replica_set_monitor.cpp | 252 +++++++++++---------- src/mongo/client/streamable_replica_set_monitor.h | 93 ++++++-- .../db/repl/tenant_migration_recipient_service.cpp | 15 +- 9 files changed, 252 insertions(+), 168 deletions(-) diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index 7d398bce6bb..acd2f116d7f 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -422,7 +422,10 @@ DBClientConnection& DBClientReplicaSet::secondaryConn() { bool DBClientReplicaSet::connect() { // Returns true if there are any up hosts. const ReadPreferenceSetting anyUpHost(ReadPreference::Nearest, TagSet()); - return _getMonitor()->getHostOrRefresh(anyUpHost).getNoThrow().isOK(); + return _getMonitor() + ->getHostOrRefresh(anyUpHost, CancelationToken::uncancelable()) + .getNoThrow() + .isOK(); } template @@ -759,7 +762,8 @@ DBClientConnection* DBClientReplicaSet::selectNodeUsingTags( ReplicaSetMonitorPtr monitor = _getMonitor(); - auto selectedNodeStatus = monitor->getHostOrRefresh(*readPref).getNoThrow(); + auto selectedNodeStatus = + monitor->getHostOrRefresh(*readPref, CancelationToken::uncancelable()).getNoThrow(); if (!selectedNodeStatus.isOK()) { LOGV2_DEBUG(20138, 3, diff --git a/src/mongo/client/remote_command_targeter_rs.cpp b/src/mongo/client/remote_command_targeter_rs.cpp index 7ddd9898769..f95f4c4fc55 100644 --- a/src/mongo/client/remote_command_targeter_rs.cpp +++ b/src/mongo/client/remote_command_targeter_rs.cpp @@ -66,12 +66,14 @@ ConnectionString RemoteCommandTargeterRS::connectionString() { SemiFuture RemoteCommandTargeterRS::findHostWithMaxWait( const ReadPreferenceSetting& readPref, Milliseconds maxWait) { - return _rsMonitor->getHostOrRefresh(readPref, maxWait); + // TODO (SERVER-51296): Add CancelationToken support to the RemoteCommandTargeter API. + return _rsMonitor->getHostOrRefresh(readPref, CancelationToken::uncancelable()); } SemiFuture> RemoteCommandTargeterRS::findHostsWithMaxWait( const ReadPreferenceSetting& readPref, Milliseconds maxWait) { - return _rsMonitor->getHostsOrRefresh(readPref, maxWait); + // TODO (SERVER-51296): Add CancelationToken support to the RemoteCommandTargeter API. + return _rsMonitor->getHostsOrRefresh(readPref, CancelationToken::uncancelable()); } StatusWith RemoteCommandTargeterRS::findHost(OperationContext* opCtx, @@ -85,11 +87,10 @@ StatusWith RemoteCommandTargeterRS::findHost(OperationContext* opCt // behavior used throughout mongos prior to version 3.4, but is not fundamentally desirable. // See comment in remote_command_targeter.h for details. bool maxTimeMsLesser = (opCtx->getRemainingMaxTimeMillis() < Milliseconds(Seconds(20))); + // TODO (SERVER-51296): Add CancelationToken support to the RemoteCommandTargeter. In this case + // we would pass the CancelationToken attached to the OperationContext to getHostOrRefresh. auto swHostAndPort = - _rsMonitor - ->getHostOrRefresh( - readPref, std::min(opCtx->getRemainingMaxTimeMillis(), Milliseconds(Seconds(20)))) - .getNoThrow(opCtx); + _rsMonitor->getHostOrRefresh(readPref, CancelationToken::uncancelable()).getNoThrow(opCtx); if (maxTimeMsLesser && swHostAndPort.getStatus() == ErrorCodes::FailedToSatisfyReadPreference) { return Status(ErrorCodes::MaxTimeMSExpired, "operation timed out"); diff --git a/src/mongo/client/replica_set_monitor_integration_test.cpp b/src/mongo/client/replica_set_monitor_integration_test.cpp index fc3b40168f2..37fa291557b 100644 --- a/src/mongo/client/replica_set_monitor_integration_test.cpp +++ b/src/mongo/client/replica_set_monitor_integration_test.cpp @@ -155,7 +155,8 @@ TEST_F(ReplicaSetMonitorFixture, StreamableRSMWireVersion) { // Schedule isMaster requests and wait for the responses. auto primaryFuture = - rsm->getHostOrRefresh(ReadPreferenceSetting(mongo::ReadPreference::PrimaryOnly)); + rsm->getHostOrRefresh(ReadPreferenceSetting(mongo::ReadPreference::PrimaryOnly), + CancelationToken::uncancelable()); primaryFuture.get(); ASSERT_EQ(rsm->getMinWireVersion(), WireVersion::LATEST_WIRE_VERSION); diff --git a/src/mongo/client/replica_set_monitor_interface.h b/src/mongo/client/replica_set_monitor_interface.h index 741f2e30af4..7faacadf9c1 100644 --- a/src/mongo/client/replica_set_monitor_interface.h +++ b/src/mongo/client/replica_set_monitor_interface.h @@ -36,6 +36,7 @@ #include "mongo/client/mongo_uri.h" #include "mongo/client/replica_set_change_notifier.h" +#include "mongo/util/cancelation.h" #include "mongo/util/duration.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -78,11 +79,11 @@ public: * Known errors are: * FailedToSatisfyReadPreference, if node cannot be found, which matches the read preference. */ - virtual SemiFuture getHostOrRefresh( - const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout) = 0; + virtual SemiFuture getHostOrRefresh(const ReadPreferenceSetting& readPref, + const CancelationToken& cancelToken) = 0; virtual SemiFuture> getHostsOrRefresh( - const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout) = 0; + const ReadPreferenceSetting& readPref, const CancelationToken& cancelToken) = 0; /** * Returns the host the RSM thinks is the current primary or uasserts. diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp index 651c3459c02..fa825fcbe7f 100644 --- a/src/mongo/client/scanning_replica_set_monitor.cpp +++ b/src/mongo/client/scanning_replica_set_monitor.cpp @@ -304,8 +304,8 @@ void ScanningReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy s } SemiFuture ScanningReplicaSetMonitor::getHostOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { - return _getHostsOrRefresh(criteria, maxWait) + const ReadPreferenceSetting& criteria, const CancelationToken&) { + return _getHostsOrRefresh(criteria, ReplicaSetMonitorInterface::kDefaultFindHostTimeout) .then([](const auto& hosts) { invariant(hosts.size()); return hosts[0]; @@ -314,8 +314,8 @@ SemiFuture ScanningReplicaSetMonitor::getHostOrRefresh( } SemiFuture> ScanningReplicaSetMonitor::getHostsOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { - return _getHostsOrRefresh(criteria, maxWait).semi(); + const ReadPreferenceSetting& criteria, const CancelationToken&) { + return _getHostsOrRefresh(criteria, ReplicaSetMonitorInterface::kDefaultFindHostTimeout).semi(); } Future> ScanningReplicaSetMonitor::_getHostsOrRefresh( @@ -352,8 +352,9 @@ Future> ScanningReplicaSetMonitor::_getHostsOrRefresh( return std::move(pf.future); } + HostAndPort ScanningReplicaSetMonitor::getPrimaryOrUassert() { - return getHostOrRefresh(kPrimaryOnlyReadPreference).get(); + return getHostOrRefresh(kPrimaryOnlyReadPreference, CancelationToken::uncancelable()).get(); } void ScanningReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) { diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h index 573b052fb53..c8107329c24 100644 --- a/src/mongo/client/scanning_replica_set_monitor.h +++ b/src/mongo/client/scanning_replica_set_monitor.h @@ -71,13 +71,19 @@ public: void drop() override; - SemiFuture getHostOrRefresh( - const ReadPreferenceSetting& readPref, - Milliseconds maxWait = kDefaultFindHostTimeout) override; + /** + * NOTE: Cancelation via CancelationTokens is not implemented for the ScanningReplicaSetMonitor, + * so any token passed in will be ignored. + */ + SemiFuture getHostOrRefresh(const ReadPreferenceSetting& readPref, + const CancelationToken&) override; - SemiFuture> getHostsOrRefresh( - const ReadPreferenceSetting& readPref, - Milliseconds maxWait = kDefaultFindHostTimeout) override; + /** + * NOTE: Cancelation via CancelationTokens is not implemented for the ScanningReplicaSetMonitor, + * so any token passed in will be ignored. + */ + SemiFuture> getHostsOrRefresh(const ReadPreferenceSetting& readPref, + const CancelationToken&) override; HostAndPort getPrimaryOrUassert() override; diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 76cbfbedba6..a41e1ffa73e 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -118,6 +118,43 @@ double pingTimeMillis(const ServerDescriptionPtr& serverDescription) { } constexpr auto kZeroMs = Milliseconds(0); + +Status makeUnsatisfiedReadPrefError(const std::string& name, + const ReadPreferenceSetting& criteria) { + return Status(ErrorCodes::FailedToSatisfyReadPreference, + str::stream() << "Could not find host matching read preference " + << criteria.toString() << " for set " << name); +} + +Status makeReplicaSetMonitorRemovedError(const std::string& name) { + return Status(ErrorCodes::ReplicaSetMonitorRemoved, + str::stream() << "ReplicaSetMonitor for set " << name << " is removed"); +} + +bool hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription, + sdam::TopologyDescriptionPtr newDescription) { + if (oldDescription->getServers().size() != newDescription->getServers().size()) + return true; + + for (const auto& server : oldDescription->getServers()) { + const auto newServer = newDescription->findServerByAddress(server->getAddress()); + if (!newServer) + return true; + const ServerDescription& s = *server; + const ServerDescription& ns = **newServer; + if (s != ns) + return true; + } + + for (const auto& server : newDescription->getServers()) { + auto oldServer = oldDescription->findServerByAddress(server->getAddress()); + if (!oldServer) + return true; + } + + return false; +} + } // namespace @@ -243,8 +280,9 @@ void StreamableReplicaSetMonitor::drop() { } SemiFuture StreamableReplicaSetMonitor::getHostOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { - return getHostsOrRefresh(criteria, maxWait) + const ReadPreferenceSetting& criteria, const CancelationToken& cancelToken) { + + return getHostsOrRefresh(criteria, cancelToken) .thenRunOn(_executor) .then([self = shared_from_this()](const std::vector& result) { invariant(result.size()); @@ -263,14 +301,15 @@ std::vector StreamableReplicaSetMonitor::_extractHosts( } SemiFuture> StreamableReplicaSetMonitor::getHostsOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { + const ReadPreferenceSetting& criteria, const CancelationToken& cancelToken) { // In the fast case (stable topology), we avoid mutex acquisition. if (_isDropped.load()) { - return _makeReplicaSetMonitorRemovedError(); + return makeReplicaSetMonitorRemovedError(getName()); } // start counting from the beginning of the operation - const auto deadline = _executor->now() + ((maxWait > kZeroMs) ? maxWait : kZeroMs); + const auto deadline = _executor->now() + + duration_cast(ReplicaSetMonitorInterface::kDefaultFindHostTimeout); // try to satisfy query immediately auto immediateResult = _getHosts(criteria); @@ -286,21 +325,21 @@ SemiFuture> StreamableReplicaSetMonitor::getHostsOrRefr "replicaSet"_attr = getName(), "readPref"_attr = readPrefToStringFull(criteria)); - // fail fast on timeout + // Fail fast on timeout or cancelation. const Date_t& now = _executor->now(); - if (deadline <= now) { - return _makeUnsatisfiedReadPrefError(criteria); + if (deadline <= now || cancelToken.isCanceled()) { + return makeUnsatisfiedReadPrefError(getName(), criteria); } return _topologyManager->executeWithLock( - [this, criteria, deadline](const TopologyDescriptionPtr& topologyDescription) + [this, criteria, cancelToken, deadline](const TopologyDescriptionPtr& topologyDescription) -> SemiFuture> { stdx::lock_guard lk(_mutex); // We check if we are closed under the mutex here since someone could have called // close() concurrently with the code above. if (_isDropped.load()) { - return _makeReplicaSetMonitorRemovedError(); + return makeReplicaSetMonitorRemovedError(getName()); } // try to satisfy the query again while holding both the StreamableRSM mutex and // TopologyManager mutex to avoid missing any topology change that has occurred @@ -310,64 +349,65 @@ SemiFuture> StreamableReplicaSetMonitor::getHostsOrRefr return {*immediateResult}; } - return _enqueueOutstandingQuery(lk, criteria, deadline); + return _enqueueOutstandingQuery(lk, criteria, cancelToken, deadline); }); } SemiFuture> StreamableReplicaSetMonitor::_enqueueOutstandingQuery( - WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline) { - using HostAndPortList = std::vector; - Future result; + WithLock, + const ReadPreferenceSetting& criteria, + const CancelationToken& cancelToken, + const Date_t& deadline) { auto query = std::make_shared(); query->criteria = criteria; - query->deadline = deadline; - auto pf = makePromiseFuture(); + auto pf = makePromiseFuture>(); query->promise = std::move(pf.promise); - auto deadlineCb = - [this, query, self = shared_from_this()](const TaskExecutor::CallbackArgs& cbArgs) { - stdx::lock_guard lock(_mutex); - if (query->done) { - return; - } + // Make the deadline task cancelable for when the query is satisfied or when the input + // cancelToken is canceled. + query->deadlineCancelSource = CancelationSource(cancelToken); + query->start = _executor->now(); - const auto cbStatus = cbArgs.status; - if (!cbStatus.isOK()) { - query->promise.setError(cbStatus); - query->done = true; - return; - } - - const auto errorStatus = _makeUnsatisfiedReadPrefError(query->criteria); - query->promise.setError(errorStatus); - query->done = true; - LOGV2_INFO(4333208, - "RSM {replicaSet} host selection timeout: {error}", - "RSM host selection timeout", - "replicaSet"_attr = getName(), - "error"_attr = errorStatus.toString()); - }; - auto swDeadlineHandle = _executor->scheduleWorkAt(query->deadline, deadlineCb); - - if (!swDeadlineHandle.isOK()) { - LOGV2_INFO(4333207, - "RSM {replicaSet} error scheduling deadline handler: {error}", - "RSM error scheduling deadline handler", - "replicaSet"_attr = getName(), - "error"_attr = swDeadlineHandle.getStatus()); - return SemiFuture::makeReady(swDeadlineHandle.getStatus()); - } - query->deadlineHandle = swDeadlineHandle.getValue(); - _outstandingQueries.push_back(query); + // Add the query to the list of outstanding queries. + auto queryIter = _outstandingQueries.insert(_outstandingQueries.end(), query); // Send topology changes to the query processor to satisfy the future. // It will be removed as a listener when all waiting queries have been satisfied. _eventsPublisher->registerListener(_queryProcessor); + // After a deadline or when the input cancelation token is canceled, cancel this query. If the + // query completes first, the deadlineCancelSource will be used to cancel this task. + _executor->sleepUntil(deadline, query->deadlineCancelSource.token()) + .getAsync([this, query, queryIter, self = shared_from_this(), cancelToken](Status status) { + // If the deadline was reached or cancelation occurred on the input cancelation token, + // mark the query as canceled. Otherwise, the deadlineCancelSource must have been + // canceled due to the query completing successfully. + if (status.isOK() || cancelToken.isCanceled()) { + auto errorStatus = makeUnsatisfiedReadPrefError(self->getName(), query->criteria); + // Mark query as done, and if it wasn't already done, remove it from the list of + // outstanding queries. + if (query->tryCancel(errorStatus)) { + LOGV2_INFO(4333208, + "RSM {replicaSet} host selection timeout: {error}", + "RSM host selection timeout", + "replicaSet"_attr = self->getName(), + "error"_attr = errorStatus.toString()); + + stdx::lock_guard lk(_mutex); + // Check that the RSM hasn't been dropped (and _outstandingQueries has not + // been cleared) before erasing. + if (!_isDropped.load()) { + invariant(_outstandingQueries.size() > 0); + _eraseQueryFromOutstandingQueries(lk, queryIter); + } + } + } + }); + return std::move(pf.future).semi(); -} // namespace mongo +} boost::optional> StreamableReplicaSetMonitor::_getHosts( const TopologyDescriptionPtr& topology, const ReadPreferenceSetting& criteria) { @@ -383,7 +423,7 @@ boost::optional> StreamableReplicaSetMonitor::_getHosts } HostAndPort StreamableReplicaSetMonitor::getPrimaryOrUassert() { - return getHostOrRefresh(kPrimaryOnlyReadPreference).get(); + return getHostOrRefresh(kPrimaryOnlyReadPreference, CancelationToken::uncancelable()).get(); } sdam::TopologyEventsPublisherPtr StreamableReplicaSetMonitor::getEventsPublisher() { @@ -599,7 +639,7 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent( return; // Notify external components if there are membership changes in the topology. - if (_hasMembershipChange(previousDescription, newDescription)) { + if (hasMembershipChange(previousDescription, newDescription)) { LOGV2(4333213, "RSM {replicaSet} Topology Change: {newTopologyDescription}", "RSM Topology Change", @@ -693,39 +733,22 @@ std::string StreamableReplicaSetMonitor::_logPrefix() { void StreamableReplicaSetMonitor::_failOutstandingWithStatus(WithLock, Status status) { for (const auto& query : _outstandingQueries) { - if (query->done) - continue; - - query->done = true; - _executor->cancel(query->deadlineHandle); - query->promise.setError(status); + (void)query->tryCancel(status); } _outstandingQueries.clear(); } -bool StreamableReplicaSetMonitor::_hasMembershipChange( - sdam::TopologyDescriptionPtr oldDescription, sdam::TopologyDescriptionPtr newDescription) { - - if (oldDescription->getServers().size() != newDescription->getServers().size()) - return true; - - for (const auto& server : oldDescription->getServers()) { - const auto newServer = newDescription->findServerByAddress(server->getAddress()); - if (!newServer) - return true; - const ServerDescription& s = *server; - const ServerDescription& ns = **newServer; - if (s != ns) - return true; - } +std::list::iterator +StreamableReplicaSetMonitor::_eraseQueryFromOutstandingQueries( + WithLock, std::list::iterator iter) { - for (const auto& server : newDescription->getServers()) { - auto oldServer = oldDescription->findServerByAddress(server->getAddress()); - if (!oldServer) - return true; + auto retVal = _outstandingQueries.erase(iter); + if (_outstandingQueries.size() == 0) { + // If there are no more outstanding queries, no need to listen for topology + // changes in this monitor. + _eventsPublisher->removeListener(_queryProcessor); } - - return false; + return retVal; } void StreamableReplicaSetMonitor::_processOutstanding( @@ -737,57 +760,52 @@ void StreamableReplicaSetMonitor::_processOutstanding( stdx::lock_guard lock(_mutex); - bool shouldRemove; auto it = _outstandingQueries.begin(); + bool hadUnresolvedQuery{false}; + + // Iterate through the outstanding queries and try to resolve them via calls to _getHosts. If we + // succeed in resolving a query, the query is removed from the list. If a query has already been + // canceled, or there are no results, it will be skipped. Cancelation logic elsewhere will + // handle removing the canceled queries from the list. while (it != _outstandingQueries.end()) { auto& query = *it; - shouldRemove = false; - if (query->done) { - shouldRemove = true; - } else { + // If query has not been canceled yet, try to satisfy it. + if (!query->hasBeenResolved()) { auto result = _getHosts(topologyDescription, query->criteria); if (result) { - _executor->cancel(query->deadlineHandle); - query->done = true; - query->promise.emplaceValue(std::move(*result)); - const auto latency = _executor->now() - query->start; - LOGV2_DEBUG(433214, - kLowerLogLevel, - "RSM {replicaSet} finished async getHosts: {readPref} ({duration})", - "RSM finished async getHosts", - "replicaSet"_attr = getName(), - "readPref"_attr = readPrefToStringFull(query->criteria), - "duration"_attr = Milliseconds(latency)); - shouldRemove = true; + if (query->tryResolveWithSuccess(std::move(*result))) { + const auto latency = _executor->now() - query->start; + LOGV2_DEBUG(433214, + kLowerLogLevel, + "RSM {replicaSet} finished async getHosts: {readPref} ({duration})", + "RSM finished async getHosts", + "replicaSet"_attr = getName(), + "readPref"_attr = readPrefToStringFull(query->criteria), + "duration"_attr = Milliseconds(latency)); + + it = _eraseQueryFromOutstandingQueries(lock, it); + } else { + // The query was canceled, so skip to the next entry without erasing it. + ++it; + } + } else { + // Results were not available, so skip to the next entry without erasing it. + ++it; + hadUnresolvedQuery = true; } + } else { + // The query was canceled, so skip to the next entry without erasing it. + ++it; } - - it = (shouldRemove) ? _outstandingQueries.erase(it) : ++it; } - if (_outstandingQueries.size()) { - // enable expedited mode + // If there remain unresolved queries, enable expedited mode. + if (hadUnresolvedQuery) { _serverDiscoveryMonitor->requestImmediateCheck(); - } else { - // if no more outstanding queries, no need to listen for topology changes in - // this monitor. - _eventsPublisher->removeListener(_queryProcessor); } } -Status StreamableReplicaSetMonitor::_makeUnsatisfiedReadPrefError( - const ReadPreferenceSetting& criteria) const { - return Status(ErrorCodes::FailedToSatisfyReadPreference, - str::stream() << "Could not find host matching read preference " - << criteria.toString() << " for set " << getName()); -} - -Status StreamableReplicaSetMonitor::_makeReplicaSetMonitorRemovedError() const { - return Status(ErrorCodes::ReplicaSetMonitorRemoved, - str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"); -} - void StreamableReplicaSetMonitor::runScanForMockReplicaSet() { MONGO_UNREACHABLE; } diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index fab0bc591fa..1f449b1282f 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -63,7 +63,7 @@ using ReplicaSetMonitorPtr = std::shared_ptr; * * All methods perform the required synchronization to allow callers from multiple threads. */ -class StreamableReplicaSetMonitor +class StreamableReplicaSetMonitor final : public ReplicaSetMonitor, public sdam::TopologyListener, public std::enable_shared_from_this { @@ -92,12 +92,12 @@ public: std::shared_ptr connectionCloser); SemiFuture getHostOrRefresh(const ReadPreferenceSetting& readPref, - Milliseconds maxWait = kDefaultFindHostTimeout); + const CancelationToken& cancelToken) override; SemiFuture> getHostsOrRefresh( - const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout); + const ReadPreferenceSetting& readPref, const CancelationToken& cancelToken) override; - HostAndPort getPrimaryOrUassert(); + HostAndPort getPrimaryOrUassert() override; void failedHost(const HostAndPort& host, const Status& status) override; void failedHostPreHandshake(const HostAndPort& host, @@ -107,27 +107,27 @@ public: const Status& status, BSONObj bson) override; - bool isPrimary(const HostAndPort& host) const; + bool isPrimary(const HostAndPort& host) const override; - bool isHostUp(const HostAndPort& host) const; + bool isHostUp(const HostAndPort& host) const override; - int getMinWireVersion() const; + int getMinWireVersion() const override; - int getMaxWireVersion() const; + int getMaxWireVersion() const override; - std::string getName() const; + std::string getName() const override; - std::string getServerAddress() const; + std::string getServerAddress() const override; - const MongoURI& getOriginalUri() const; + const MongoURI& getOriginalUri() const override; sdam::TopologyEventsPublisherPtr getEventsPublisher(); - bool contains(const HostAndPort& server) const; + bool contains(const HostAndPort& server) const override; - void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const; + void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override; - bool isKnownToHaveGoodPrimary() const; + bool isKnownToHaveGoodPrimary() const override; void runScanForMockReplicaSet() override; private: @@ -136,13 +136,53 @@ private: std::shared_ptr; struct HostQuery { - Date_t deadline; - executor::TaskExecutor::CallbackHandle deadlineHandle; + ~HostQuery() { + invariant(hasBeenResolved()); + } + + bool hasBeenResolved() { + return done.load(); + } + + /** + * Tries to mark the query as done and resolve its promise with an error status, and returns + * whether or not it was able to do so. + */ + bool tryCancel(Status status) { + invariant(!status.isOK()); + auto wasAlreadyDone = done.swap(true); + if (!wasAlreadyDone) { + promise.setError(status); + deadlineCancelSource.cancel(); + } + return !wasAlreadyDone; + } + + /** + * Tries to mark the query as done and resolve its promise with a successful result, and + * returns whether or not it was able to do so. + */ + bool tryResolveWithSuccess(std::vector&& result) { + auto wasAlreadyDone = done.swap(true); + if (!wasAlreadyDone) { + promise.emplaceValue(std::move(result)); + deadlineCancelSource.cancel(); + } + return !wasAlreadyDone; + } + + CancelationSource deadlineCancelSource; + ReadPreferenceSetting criteria; - Date_t start = Date_t::now(); - bool done = false; + + // Used to compute latency. + Date_t start; + + AtomicWord done{false}; + Promise> promise; }; + using HostQueryPtr = std::shared_ptr; // Information collected from the primary ServerDescription to be published via the @@ -154,7 +194,14 @@ private: }; SemiFuture> _enqueueOutstandingQuery( - WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline); + WithLock, + const ReadPreferenceSetting& criteria, + const CancelationToken& cancelToken, + const Date_t& deadline); + + // Removes the query pointed to by iter and returns an iterator to the next item in the list. + std::list::iterator _eraseQueryFromOutstandingQueries( + WithLock, std::list::iterator iter); std::vector _extractHosts( const std::vector& serverDescriptions); @@ -200,12 +247,8 @@ private: std::string _logPrefix(); void _failOutstandingWithStatus(WithLock, Status status); - bool _hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription, - sdam::TopologyDescriptionPtr newDescription); - void _setConfirmedNotifierState(WithLock, const ServerDescriptionPtr& primaryDescription); - Status _makeUnsatisfiedReadPrefError(const ReadPreferenceSetting& criteria) const; - Status _makeReplicaSetMonitorRemovedError() const; + void _setConfirmedNotifierState(WithLock, const ServerDescriptionPtr& primaryDescription); // Try to satisfy the outstanding queries for this instance with the given topology information. void _processOutstanding(const TopologyDescriptionPtr& topologyDescription); @@ -241,7 +284,7 @@ private: AtomicWord _isDropped{true}; mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicaSetMonitor"); - std::vector _outstandingQueries; + std::list _outstandingQueries; boost::optional _confirmedNotifierState; mutable PseudoRandom _random; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index fa926230adc..d13eb6c7f5a 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -298,11 +298,20 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() { stdx::lock_guard lk(_mutex); _donorReplicaSetMonitor = ReplicaSetMonitor::createIfNeeded( connectionString.getSetName(), std::set(servers.begin(), servers.end())); - Milliseconds findHostTimeout = ReplicaSetMonitorInterface::kDefaultFindHostTimeout; + + // Only ever used to cancel when the setTenantMigrationRecipientInstanceHostTimeout failpoint is + // set. + CancelationSource getHostCancelSource; setTenantMigrationRecipientInstanceHostTimeout.execute([&](const BSONObj& data) { - findHostTimeout = Milliseconds(data["findHostTimeoutMillis"].safeNumberLong()); + auto exec = **_scopedExecutor; + const auto deadline = + exec->now() + Milliseconds(data["findHostTimeoutMillis"].safeNumberLong()); + // Cancel the find host request after a timeout. Ignore callback handle. + exec->sleepUntil(deadline, CancelationToken::uncancelable()) + .getAsync([getHostCancelSource](auto) mutable { getHostCancelSource.cancel(); }); }); - return _donorReplicaSetMonitor->getHostOrRefresh(_readPreference, findHostTimeout) + + return _donorReplicaSetMonitor->getHostOrRefresh(_readPreference, getHostCancelSource.token()) .thenRunOn(**_scopedExecutor) .then([this](const HostAndPort& serverAddress) { // Application name is constructed such that it doesn't exceeds -- cgit v1.2.1