summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2020-11-24 22:38:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-24 23:41:48 +0000
commit322d6dc73aac61c675d00ee0e45bedecbc286f4d (patch)
treed237a402db1627f48af9566792004b6dbae624ee
parent4057c9256996a67884cd5891bf8be596c0dd5742 (diff)
downloadmongo-322d6dc73aac61c675d00ee0e45bedecbc286f4d.tar.gz
SERVER-50657 Add CancelationToken support to the ReplicaSetMonitor
-rw-r--r--src/mongo/client/dbclient_rs.cpp8
-rw-r--r--src/mongo/client/remote_command_targeter_rs.cpp13
-rw-r--r--src/mongo/client/replica_set_monitor_integration_test.cpp3
-rw-r--r--src/mongo/client/replica_set_monitor_interface.h7
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.cpp11
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.h18
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp252
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h93
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp15
9 files changed, 252 insertions, 168 deletions
diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp
index 7d398bce6bb..acd2f116d7f 100644
--- a/src/mongo/client/dbclient_rs.cpp
+++ b/src/mongo/client/dbclient_rs.cpp
@@ -422,7 +422,10 @@ DBClientConnection& DBClientReplicaSet::secondaryConn() {
bool DBClientReplicaSet::connect() {
// Returns true if there are any up hosts.
const ReadPreferenceSetting anyUpHost(ReadPreference::Nearest, TagSet());
- return _getMonitor()->getHostOrRefresh(anyUpHost).getNoThrow().isOK();
+ return _getMonitor()
+ ->getHostOrRefresh(anyUpHost, CancelationToken::uncancelable())
+ .getNoThrow()
+ .isOK();
}
template <typename Authenticate>
@@ -759,7 +762,8 @@ DBClientConnection* DBClientReplicaSet::selectNodeUsingTags(
ReplicaSetMonitorPtr monitor = _getMonitor();
- auto selectedNodeStatus = monitor->getHostOrRefresh(*readPref).getNoThrow();
+ auto selectedNodeStatus =
+ monitor->getHostOrRefresh(*readPref, CancelationToken::uncancelable()).getNoThrow();
if (!selectedNodeStatus.isOK()) {
LOGV2_DEBUG(20138,
3,
diff --git a/src/mongo/client/remote_command_targeter_rs.cpp b/src/mongo/client/remote_command_targeter_rs.cpp
index 7ddd9898769..f95f4c4fc55 100644
--- a/src/mongo/client/remote_command_targeter_rs.cpp
+++ b/src/mongo/client/remote_command_targeter_rs.cpp
@@ -66,12 +66,14 @@ ConnectionString RemoteCommandTargeterRS::connectionString() {
SemiFuture<HostAndPort> RemoteCommandTargeterRS::findHostWithMaxWait(
const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
- return _rsMonitor->getHostOrRefresh(readPref, maxWait);
+ // TODO (SERVER-51296): Add CancelationToken support to the RemoteCommandTargeter API.
+ return _rsMonitor->getHostOrRefresh(readPref, CancelationToken::uncancelable());
}
SemiFuture<std::vector<HostAndPort>> RemoteCommandTargeterRS::findHostsWithMaxWait(
const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
- return _rsMonitor->getHostsOrRefresh(readPref, maxWait);
+ // TODO (SERVER-51296): Add CancelationToken support to the RemoteCommandTargeter API.
+ return _rsMonitor->getHostsOrRefresh(readPref, CancelationToken::uncancelable());
}
StatusWith<HostAndPort> RemoteCommandTargeterRS::findHost(OperationContext* opCtx,
@@ -85,11 +87,10 @@ StatusWith<HostAndPort> RemoteCommandTargeterRS::findHost(OperationContext* opCt
// behavior used throughout mongos prior to version 3.4, but is not fundamentally desirable.
// See comment in remote_command_targeter.h for details.
bool maxTimeMsLesser = (opCtx->getRemainingMaxTimeMillis() < Milliseconds(Seconds(20)));
+ // TODO (SERVER-51296): Add CancelationToken support to the RemoteCommandTargeter. In this case
+ // we would pass the CancelationToken attached to the OperationContext to getHostOrRefresh.
auto swHostAndPort =
- _rsMonitor
- ->getHostOrRefresh(
- readPref, std::min(opCtx->getRemainingMaxTimeMillis(), Milliseconds(Seconds(20))))
- .getNoThrow(opCtx);
+ _rsMonitor->getHostOrRefresh(readPref, CancelationToken::uncancelable()).getNoThrow(opCtx);
if (maxTimeMsLesser && swHostAndPort.getStatus() == ErrorCodes::FailedToSatisfyReadPreference) {
return Status(ErrorCodes::MaxTimeMSExpired, "operation timed out");
diff --git a/src/mongo/client/replica_set_monitor_integration_test.cpp b/src/mongo/client/replica_set_monitor_integration_test.cpp
index fc3b40168f2..37fa291557b 100644
--- a/src/mongo/client/replica_set_monitor_integration_test.cpp
+++ b/src/mongo/client/replica_set_monitor_integration_test.cpp
@@ -155,7 +155,8 @@ TEST_F(ReplicaSetMonitorFixture, StreamableRSMWireVersion) {
// Schedule isMaster requests and wait for the responses.
auto primaryFuture =
- rsm->getHostOrRefresh(ReadPreferenceSetting(mongo::ReadPreference::PrimaryOnly));
+ rsm->getHostOrRefresh(ReadPreferenceSetting(mongo::ReadPreference::PrimaryOnly),
+ CancelationToken::uncancelable());
primaryFuture.get();
ASSERT_EQ(rsm->getMinWireVersion(), WireVersion::LATEST_WIRE_VERSION);
diff --git a/src/mongo/client/replica_set_monitor_interface.h b/src/mongo/client/replica_set_monitor_interface.h
index 741f2e30af4..7faacadf9c1 100644
--- a/src/mongo/client/replica_set_monitor_interface.h
+++ b/src/mongo/client/replica_set_monitor_interface.h
@@ -36,6 +36,7 @@
#include "mongo/client/mongo_uri.h"
#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/util/cancelation.h"
#include "mongo/util/duration.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -78,11 +79,11 @@ public:
* Known errors are:
* FailedToSatisfyReadPreference, if node cannot be found, which matches the read preference.
*/
- virtual SemiFuture<HostAndPort> getHostOrRefresh(
- const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout) = 0;
+ virtual SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
+ const CancelationToken& cancelToken) = 0;
virtual SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
- const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout) = 0;
+ const ReadPreferenceSetting& readPref, const CancelationToken& cancelToken) = 0;
/**
* Returns the host the RSM thinks is the current primary or uasserts.
diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp
index 651c3459c02..fa825fcbe7f 100644
--- a/src/mongo/client/scanning_replica_set_monitor.cpp
+++ b/src/mongo/client/scanning_replica_set_monitor.cpp
@@ -304,8 +304,8 @@ void ScanningReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy s
}
SemiFuture<HostAndPort> ScanningReplicaSetMonitor::getHostOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
- return _getHostsOrRefresh(criteria, maxWait)
+ const ReadPreferenceSetting& criteria, const CancelationToken&) {
+ return _getHostsOrRefresh(criteria, ReplicaSetMonitorInterface::kDefaultFindHostTimeout)
.then([](const auto& hosts) {
invariant(hosts.size());
return hosts[0];
@@ -314,8 +314,8 @@ SemiFuture<HostAndPort> ScanningReplicaSetMonitor::getHostOrRefresh(
}
SemiFuture<std::vector<HostAndPort>> ScanningReplicaSetMonitor::getHostsOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
- return _getHostsOrRefresh(criteria, maxWait).semi();
+ const ReadPreferenceSetting& criteria, const CancelationToken&) {
+ return _getHostsOrRefresh(criteria, ReplicaSetMonitorInterface::kDefaultFindHostTimeout).semi();
}
Future<std::vector<HostAndPort>> ScanningReplicaSetMonitor::_getHostsOrRefresh(
@@ -352,8 +352,9 @@ Future<std::vector<HostAndPort>> ScanningReplicaSetMonitor::_getHostsOrRefresh(
return std::move(pf.future);
}
+
HostAndPort ScanningReplicaSetMonitor::getPrimaryOrUassert() {
- return getHostOrRefresh(kPrimaryOnlyReadPreference).get();
+ return getHostOrRefresh(kPrimaryOnlyReadPreference, CancelationToken::uncancelable()).get();
}
void ScanningReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h
index 573b052fb53..c8107329c24 100644
--- a/src/mongo/client/scanning_replica_set_monitor.h
+++ b/src/mongo/client/scanning_replica_set_monitor.h
@@ -71,13 +71,19 @@ public:
void drop() override;
- SemiFuture<HostAndPort> getHostOrRefresh(
- const ReadPreferenceSetting& readPref,
- Milliseconds maxWait = kDefaultFindHostTimeout) override;
+ /**
+ * NOTE: Cancelation via CancelationTokens is not implemented for the ScanningReplicaSetMonitor,
+ * so any token passed in will be ignored.
+ */
+ SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
+ const CancelationToken&) override;
- SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
- const ReadPreferenceSetting& readPref,
- Milliseconds maxWait = kDefaultFindHostTimeout) override;
+ /**
+ * NOTE: Cancelation via CancelationTokens is not implemented for the ScanningReplicaSetMonitor,
+ * so any token passed in will be ignored.
+ */
+ SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(const ReadPreferenceSetting& readPref,
+ const CancelationToken&) override;
HostAndPort getPrimaryOrUassert() override;
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 76cbfbedba6..a41e1ffa73e 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -118,6 +118,43 @@ double pingTimeMillis(const ServerDescriptionPtr& serverDescription) {
}
constexpr auto kZeroMs = Milliseconds(0);
+
+Status makeUnsatisfiedReadPrefError(const std::string& name,
+ const ReadPreferenceSetting& criteria) {
+ return Status(ErrorCodes::FailedToSatisfyReadPreference,
+ str::stream() << "Could not find host matching read preference "
+ << criteria.toString() << " for set " << name);
+}
+
+Status makeReplicaSetMonitorRemovedError(const std::string& name) {
+ return Status(ErrorCodes::ReplicaSetMonitorRemoved,
+ str::stream() << "ReplicaSetMonitor for set " << name << " is removed");
+}
+
+bool hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription,
+ sdam::TopologyDescriptionPtr newDescription) {
+ if (oldDescription->getServers().size() != newDescription->getServers().size())
+ return true;
+
+ for (const auto& server : oldDescription->getServers()) {
+ const auto newServer = newDescription->findServerByAddress(server->getAddress());
+ if (!newServer)
+ return true;
+ const ServerDescription& s = *server;
+ const ServerDescription& ns = **newServer;
+ if (s != ns)
+ return true;
+ }
+
+ for (const auto& server : newDescription->getServers()) {
+ auto oldServer = oldDescription->findServerByAddress(server->getAddress());
+ if (!oldServer)
+ return true;
+ }
+
+ return false;
+}
+
} // namespace
@@ -243,8 +280,9 @@ void StreamableReplicaSetMonitor::drop() {
}
SemiFuture<HostAndPort> StreamableReplicaSetMonitor::getHostOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
- return getHostsOrRefresh(criteria, maxWait)
+ const ReadPreferenceSetting& criteria, const CancelationToken& cancelToken) {
+
+ return getHostsOrRefresh(criteria, cancelToken)
.thenRunOn(_executor)
.then([self = shared_from_this()](const std::vector<HostAndPort>& result) {
invariant(result.size());
@@ -263,14 +301,15 @@ std::vector<HostAndPort> StreamableReplicaSetMonitor::_extractHosts(
}
SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
+ const ReadPreferenceSetting& criteria, const CancelationToken& cancelToken) {
// In the fast case (stable topology), we avoid mutex acquisition.
if (_isDropped.load()) {
- return _makeReplicaSetMonitorRemovedError();
+ return makeReplicaSetMonitorRemovedError(getName());
}
// start counting from the beginning of the operation
- const auto deadline = _executor->now() + ((maxWait > kZeroMs) ? maxWait : kZeroMs);
+ const auto deadline = _executor->now() +
+ duration_cast<Milliseconds>(ReplicaSetMonitorInterface::kDefaultFindHostTimeout);
// try to satisfy query immediately
auto immediateResult = _getHosts(criteria);
@@ -286,21 +325,21 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefr
"replicaSet"_attr = getName(),
"readPref"_attr = readPrefToStringFull(criteria));
- // fail fast on timeout
+ // Fail fast on timeout or cancelation.
const Date_t& now = _executor->now();
- if (deadline <= now) {
- return _makeUnsatisfiedReadPrefError(criteria);
+ if (deadline <= now || cancelToken.isCanceled()) {
+ return makeUnsatisfiedReadPrefError(getName(), criteria);
}
return _topologyManager->executeWithLock(
- [this, criteria, deadline](const TopologyDescriptionPtr& topologyDescription)
+ [this, criteria, cancelToken, deadline](const TopologyDescriptionPtr& topologyDescription)
-> SemiFuture<std::vector<HostAndPort>> {
stdx::lock_guard lk(_mutex);
// We check if we are closed under the mutex here since someone could have called
// close() concurrently with the code above.
if (_isDropped.load()) {
- return _makeReplicaSetMonitorRemovedError();
+ return makeReplicaSetMonitorRemovedError(getName());
}
// try to satisfy the query again while holding both the StreamableRSM mutex and
// TopologyManager mutex to avoid missing any topology change that has occurred
@@ -310,64 +349,65 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefr
return {*immediateResult};
}
- return _enqueueOutstandingQuery(lk, criteria, deadline);
+ return _enqueueOutstandingQuery(lk, criteria, cancelToken, deadline);
});
}
SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutstandingQuery(
- WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline) {
- using HostAndPortList = std::vector<HostAndPort>;
- Future<HostAndPortList> result;
+ WithLock,
+ const ReadPreferenceSetting& criteria,
+ const CancelationToken& cancelToken,
+ const Date_t& deadline) {
auto query = std::make_shared<HostQuery>();
query->criteria = criteria;
- query->deadline = deadline;
- auto pf = makePromiseFuture<HostAndPortList>();
+ auto pf = makePromiseFuture<std::vector<HostAndPort>>();
query->promise = std::move(pf.promise);
- auto deadlineCb =
- [this, query, self = shared_from_this()](const TaskExecutor::CallbackArgs& cbArgs) {
- stdx::lock_guard lock(_mutex);
- if (query->done) {
- return;
- }
+ // Make the deadline task cancelable for when the query is satisfied or when the input
+ // cancelToken is canceled.
+ query->deadlineCancelSource = CancelationSource(cancelToken);
+ query->start = _executor->now();
- const auto cbStatus = cbArgs.status;
- if (!cbStatus.isOK()) {
- query->promise.setError(cbStatus);
- query->done = true;
- return;
- }
-
- const auto errorStatus = _makeUnsatisfiedReadPrefError(query->criteria);
- query->promise.setError(errorStatus);
- query->done = true;
- LOGV2_INFO(4333208,
- "RSM {replicaSet} host selection timeout: {error}",
- "RSM host selection timeout",
- "replicaSet"_attr = getName(),
- "error"_attr = errorStatus.toString());
- };
- auto swDeadlineHandle = _executor->scheduleWorkAt(query->deadline, deadlineCb);
-
- if (!swDeadlineHandle.isOK()) {
- LOGV2_INFO(4333207,
- "RSM {replicaSet} error scheduling deadline handler: {error}",
- "RSM error scheduling deadline handler",
- "replicaSet"_attr = getName(),
- "error"_attr = swDeadlineHandle.getStatus());
- return SemiFuture<HostAndPortList>::makeReady(swDeadlineHandle.getStatus());
- }
- query->deadlineHandle = swDeadlineHandle.getValue();
- _outstandingQueries.push_back(query);
+ // Add the query to the list of outstanding queries.
+ auto queryIter = _outstandingQueries.insert(_outstandingQueries.end(), query);
// Send topology changes to the query processor to satisfy the future.
// It will be removed as a listener when all waiting queries have been satisfied.
_eventsPublisher->registerListener(_queryProcessor);
+ // After a deadline or when the input cancelation token is canceled, cancel this query. If the
+ // query completes first, the deadlineCancelSource will be used to cancel this task.
+ _executor->sleepUntil(deadline, query->deadlineCancelSource.token())
+ .getAsync([this, query, queryIter, self = shared_from_this(), cancelToken](Status status) {
+ // If the deadline was reached or cancelation occurred on the input cancelation token,
+ // mark the query as canceled. Otherwise, the deadlineCancelSource must have been
+ // canceled due to the query completing successfully.
+ if (status.isOK() || cancelToken.isCanceled()) {
+ auto errorStatus = makeUnsatisfiedReadPrefError(self->getName(), query->criteria);
+ // Mark query as done, and if it wasn't already done, remove it from the list of
+ // outstanding queries.
+ if (query->tryCancel(errorStatus)) {
+ LOGV2_INFO(4333208,
+ "RSM {replicaSet} host selection timeout: {error}",
+ "RSM host selection timeout",
+ "replicaSet"_attr = self->getName(),
+ "error"_attr = errorStatus.toString());
+
+ stdx::lock_guard lk(_mutex);
+ // Check that the RSM hasn't been dropped (and _outstandingQueries has not
+ // been cleared) before erasing.
+ if (!_isDropped.load()) {
+ invariant(_outstandingQueries.size() > 0);
+ _eraseQueryFromOutstandingQueries(lk, queryIter);
+ }
+ }
+ }
+ });
+
return std::move(pf.future).semi();
-} // namespace mongo
+}
boost::optional<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_getHosts(
const TopologyDescriptionPtr& topology, const ReadPreferenceSetting& criteria) {
@@ -383,7 +423,7 @@ boost::optional<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_getHosts
}
HostAndPort StreamableReplicaSetMonitor::getPrimaryOrUassert() {
- return getHostOrRefresh(kPrimaryOnlyReadPreference).get();
+ return getHostOrRefresh(kPrimaryOnlyReadPreference, CancelationToken::uncancelable()).get();
}
sdam::TopologyEventsPublisherPtr StreamableReplicaSetMonitor::getEventsPublisher() {
@@ -599,7 +639,7 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent(
return;
// Notify external components if there are membership changes in the topology.
- if (_hasMembershipChange(previousDescription, newDescription)) {
+ if (hasMembershipChange(previousDescription, newDescription)) {
LOGV2(4333213,
"RSM {replicaSet} Topology Change: {newTopologyDescription}",
"RSM Topology Change",
@@ -693,39 +733,22 @@ std::string StreamableReplicaSetMonitor::_logPrefix() {
void StreamableReplicaSetMonitor::_failOutstandingWithStatus(WithLock, Status status) {
for (const auto& query : _outstandingQueries) {
- if (query->done)
- continue;
-
- query->done = true;
- _executor->cancel(query->deadlineHandle);
- query->promise.setError(status);
+ (void)query->tryCancel(status);
}
_outstandingQueries.clear();
}
-bool StreamableReplicaSetMonitor::_hasMembershipChange(
- sdam::TopologyDescriptionPtr oldDescription, sdam::TopologyDescriptionPtr newDescription) {
-
- if (oldDescription->getServers().size() != newDescription->getServers().size())
- return true;
-
- for (const auto& server : oldDescription->getServers()) {
- const auto newServer = newDescription->findServerByAddress(server->getAddress());
- if (!newServer)
- return true;
- const ServerDescription& s = *server;
- const ServerDescription& ns = **newServer;
- if (s != ns)
- return true;
- }
+std::list<StreamableReplicaSetMonitor::HostQueryPtr>::iterator
+StreamableReplicaSetMonitor::_eraseQueryFromOutstandingQueries(
+ WithLock, std::list<HostQueryPtr>::iterator iter) {
- for (const auto& server : newDescription->getServers()) {
- auto oldServer = oldDescription->findServerByAddress(server->getAddress());
- if (!oldServer)
- return true;
+ auto retVal = _outstandingQueries.erase(iter);
+ if (_outstandingQueries.size() == 0) {
+ // If there are no more outstanding queries, no need to listen for topology
+ // changes in this monitor.
+ _eventsPublisher->removeListener(_queryProcessor);
}
-
- return false;
+ return retVal;
}
void StreamableReplicaSetMonitor::_processOutstanding(
@@ -737,57 +760,52 @@ void StreamableReplicaSetMonitor::_processOutstanding(
stdx::lock_guard lock(_mutex);
- bool shouldRemove;
auto it = _outstandingQueries.begin();
+ bool hadUnresolvedQuery{false};
+
+ // Iterate through the outstanding queries and try to resolve them via calls to _getHosts. If we
+ // succeed in resolving a query, the query is removed from the list. If a query has already been
+ // canceled, or there are no results, it will be skipped. Cancelation logic elsewhere will
+ // handle removing the canceled queries from the list.
while (it != _outstandingQueries.end()) {
auto& query = *it;
- shouldRemove = false;
- if (query->done) {
- shouldRemove = true;
- } else {
+ // If query has not been canceled yet, try to satisfy it.
+ if (!query->hasBeenResolved()) {
auto result = _getHosts(topologyDescription, query->criteria);
if (result) {
- _executor->cancel(query->deadlineHandle);
- query->done = true;
- query->promise.emplaceValue(std::move(*result));
- const auto latency = _executor->now() - query->start;
- LOGV2_DEBUG(433214,
- kLowerLogLevel,
- "RSM {replicaSet} finished async getHosts: {readPref} ({duration})",
- "RSM finished async getHosts",
- "replicaSet"_attr = getName(),
- "readPref"_attr = readPrefToStringFull(query->criteria),
- "duration"_attr = Milliseconds(latency));
- shouldRemove = true;
+ if (query->tryResolveWithSuccess(std::move(*result))) {
+ const auto latency = _executor->now() - query->start;
+ LOGV2_DEBUG(433214,
+ kLowerLogLevel,
+ "RSM {replicaSet} finished async getHosts: {readPref} ({duration})",
+ "RSM finished async getHosts",
+ "replicaSet"_attr = getName(),
+ "readPref"_attr = readPrefToStringFull(query->criteria),
+ "duration"_attr = Milliseconds(latency));
+
+ it = _eraseQueryFromOutstandingQueries(lock, it);
+ } else {
+ // The query was canceled, so skip to the next entry without erasing it.
+ ++it;
+ }
+ } else {
+ // Results were not available, so skip to the next entry without erasing it.
+ ++it;
+ hadUnresolvedQuery = true;
}
+ } else {
+ // The query was canceled, so skip to the next entry without erasing it.
+ ++it;
}
-
- it = (shouldRemove) ? _outstandingQueries.erase(it) : ++it;
}
- if (_outstandingQueries.size()) {
- // enable expedited mode
+ // If there remain unresolved queries, enable expedited mode.
+ if (hadUnresolvedQuery) {
_serverDiscoveryMonitor->requestImmediateCheck();
- } else {
- // if no more outstanding queries, no need to listen for topology changes in
- // this monitor.
- _eventsPublisher->removeListener(_queryProcessor);
}
}
-Status StreamableReplicaSetMonitor::_makeUnsatisfiedReadPrefError(
- const ReadPreferenceSetting& criteria) const {
- return Status(ErrorCodes::FailedToSatisfyReadPreference,
- str::stream() << "Could not find host matching read preference "
- << criteria.toString() << " for set " << getName());
-}
-
-Status StreamableReplicaSetMonitor::_makeReplicaSetMonitorRemovedError() const {
- return Status(ErrorCodes::ReplicaSetMonitorRemoved,
- str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed");
-}
-
void StreamableReplicaSetMonitor::runScanForMockReplicaSet() {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index fab0bc591fa..1f449b1282f 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -63,7 +63,7 @@ using ReplicaSetMonitorPtr = std::shared_ptr<ReplicaSetMonitor>;
*
* All methods perform the required synchronization to allow callers from multiple threads.
*/
-class StreamableReplicaSetMonitor
+class StreamableReplicaSetMonitor final
: public ReplicaSetMonitor,
public sdam::TopologyListener,
public std::enable_shared_from_this<StreamableReplicaSetMonitor> {
@@ -92,12 +92,12 @@ public:
std::shared_ptr<executor::EgressTagCloser> connectionCloser);
SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
- Milliseconds maxWait = kDefaultFindHostTimeout);
+ const CancelationToken& cancelToken) override;
SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
- const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout);
+ const ReadPreferenceSetting& readPref, const CancelationToken& cancelToken) override;
- HostAndPort getPrimaryOrUassert();
+ HostAndPort getPrimaryOrUassert() override;
void failedHost(const HostAndPort& host, const Status& status) override;
void failedHostPreHandshake(const HostAndPort& host,
@@ -107,27 +107,27 @@ public:
const Status& status,
BSONObj bson) override;
- bool isPrimary(const HostAndPort& host) const;
+ bool isPrimary(const HostAndPort& host) const override;
- bool isHostUp(const HostAndPort& host) const;
+ bool isHostUp(const HostAndPort& host) const override;
- int getMinWireVersion() const;
+ int getMinWireVersion() const override;
- int getMaxWireVersion() const;
+ int getMaxWireVersion() const override;
- std::string getName() const;
+ std::string getName() const override;
- std::string getServerAddress() const;
+ std::string getServerAddress() const override;
- const MongoURI& getOriginalUri() const;
+ const MongoURI& getOriginalUri() const override;
sdam::TopologyEventsPublisherPtr getEventsPublisher();
- bool contains(const HostAndPort& server) const;
+ bool contains(const HostAndPort& server) const override;
- void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const;
+ void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override;
- bool isKnownToHaveGoodPrimary() const;
+ bool isKnownToHaveGoodPrimary() const override;
void runScanForMockReplicaSet() override;
private:
@@ -136,13 +136,53 @@ private:
std::shared_ptr<StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor>;
struct HostQuery {
- Date_t deadline;
- executor::TaskExecutor::CallbackHandle deadlineHandle;
+ ~HostQuery() {
+ invariant(hasBeenResolved());
+ }
+
+ bool hasBeenResolved() {
+ return done.load();
+ }
+
+ /**
+ * Tries to mark the query as done and resolve its promise with an error status, and returns
+ * whether or not it was able to do so.
+ */
+ bool tryCancel(Status status) {
+ invariant(!status.isOK());
+ auto wasAlreadyDone = done.swap(true);
+ if (!wasAlreadyDone) {
+ promise.setError(status);
+ deadlineCancelSource.cancel();
+ }
+ return !wasAlreadyDone;
+ }
+
+ /**
+ * Tries to mark the query as done and resolve its promise with a successful result, and
+ * returns whether or not it was able to do so.
+ */
+ bool tryResolveWithSuccess(std::vector<HostAndPort>&& result) {
+ auto wasAlreadyDone = done.swap(true);
+ if (!wasAlreadyDone) {
+ promise.emplaceValue(std::move(result));
+ deadlineCancelSource.cancel();
+ }
+ return !wasAlreadyDone;
+ }
+
+ CancelationSource deadlineCancelSource;
+
ReadPreferenceSetting criteria;
- Date_t start = Date_t::now();
- bool done = false;
+
+ // Used to compute latency.
+ Date_t start;
+
+ AtomicWord<bool> done{false};
+
Promise<std::vector<HostAndPort>> promise;
};
+
using HostQueryPtr = std::shared_ptr<HostQuery>;
// Information collected from the primary ServerDescription to be published via the
@@ -154,7 +194,14 @@ private:
};
SemiFuture<std::vector<HostAndPort>> _enqueueOutstandingQuery(
- WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline);
+ WithLock,
+ const ReadPreferenceSetting& criteria,
+ const CancelationToken& cancelToken,
+ const Date_t& deadline);
+
+ // Removes the query pointed to by iter and returns an iterator to the next item in the list.
+ std::list<HostQueryPtr>::iterator _eraseQueryFromOutstandingQueries(
+ WithLock, std::list<HostQueryPtr>::iterator iter);
std::vector<HostAndPort> _extractHosts(
const std::vector<sdam::ServerDescriptionPtr>& serverDescriptions);
@@ -200,12 +247,8 @@ private:
std::string _logPrefix();
void _failOutstandingWithStatus(WithLock, Status status);
- bool _hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription,
- sdam::TopologyDescriptionPtr newDescription);
- void _setConfirmedNotifierState(WithLock, const ServerDescriptionPtr& primaryDescription);
- Status _makeUnsatisfiedReadPrefError(const ReadPreferenceSetting& criteria) const;
- Status _makeReplicaSetMonitorRemovedError() const;
+ void _setConfirmedNotifierState(WithLock, const ServerDescriptionPtr& primaryDescription);
// Try to satisfy the outstanding queries for this instance with the given topology information.
void _processOutstanding(const TopologyDescriptionPtr& topologyDescription);
@@ -241,7 +284,7 @@ private:
AtomicWord<bool> _isDropped{true};
mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicaSetMonitor");
- std::vector<HostQueryPtr> _outstandingQueries;
+ std::list<HostQueryPtr> _outstandingQueries;
boost::optional<ChangeNotifierState> _confirmedNotifierState;
mutable PseudoRandom _random;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index fa926230adc..d13eb6c7f5a 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -298,11 +298,20 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() {
stdx::lock_guard lk(_mutex);
_donorReplicaSetMonitor = ReplicaSetMonitor::createIfNeeded(
connectionString.getSetName(), std::set<HostAndPort>(servers.begin(), servers.end()));
- Milliseconds findHostTimeout = ReplicaSetMonitorInterface::kDefaultFindHostTimeout;
+
+ // Only ever used to cancel when the setTenantMigrationRecipientInstanceHostTimeout failpoint is
+ // set.
+ CancelationSource getHostCancelSource;
setTenantMigrationRecipientInstanceHostTimeout.execute([&](const BSONObj& data) {
- findHostTimeout = Milliseconds(data["findHostTimeoutMillis"].safeNumberLong());
+ auto exec = **_scopedExecutor;
+ const auto deadline =
+ exec->now() + Milliseconds(data["findHostTimeoutMillis"].safeNumberLong());
+ // Cancel the find host request after a timeout. Ignore callback handle.
+ exec->sleepUntil(deadline, CancelationToken::uncancelable())
+ .getAsync([getHostCancelSource](auto) mutable { getHostCancelSource.cancel(); });
});
- return _donorReplicaSetMonitor->getHostOrRefresh(_readPreference, findHostTimeout)
+
+ return _donorReplicaSetMonitor->getHostOrRefresh(_readPreference, getHostCancelSource.token())
.thenRunOn(**_scopedExecutor)
.then([this](const HostAndPort& serverAddress) {
// Application name is constructed such that it doesn't exceeds