summaryrefslogtreecommitdiff
path: root/src/mongo/client/streamable_replica_set_monitor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/streamable_replica_set_monitor.cpp')
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp252
1 files changed, 135 insertions, 117 deletions
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 76cbfbedba6..a41e1ffa73e 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -118,6 +118,43 @@ double pingTimeMillis(const ServerDescriptionPtr& serverDescription) {
}
constexpr auto kZeroMs = Milliseconds(0);
+
+Status makeUnsatisfiedReadPrefError(const std::string& name,
+ const ReadPreferenceSetting& criteria) {
+ return Status(ErrorCodes::FailedToSatisfyReadPreference,
+ str::stream() << "Could not find host matching read preference "
+ << criteria.toString() << " for set " << name);
+}
+
+Status makeReplicaSetMonitorRemovedError(const std::string& name) {
+ return Status(ErrorCodes::ReplicaSetMonitorRemoved,
+ str::stream() << "ReplicaSetMonitor for set " << name << " is removed");
+}
+
+bool hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription,
+ sdam::TopologyDescriptionPtr newDescription) {
+ if (oldDescription->getServers().size() != newDescription->getServers().size())
+ return true;
+
+ for (const auto& server : oldDescription->getServers()) {
+ const auto newServer = newDescription->findServerByAddress(server->getAddress());
+ if (!newServer)
+ return true;
+ const ServerDescription& s = *server;
+ const ServerDescription& ns = **newServer;
+ if (s != ns)
+ return true;
+ }
+
+ for (const auto& server : newDescription->getServers()) {
+ auto oldServer = oldDescription->findServerByAddress(server->getAddress());
+ if (!oldServer)
+ return true;
+ }
+
+ return false;
+}
+
} // namespace
@@ -243,8 +280,9 @@ void StreamableReplicaSetMonitor::drop() {
}
SemiFuture<HostAndPort> StreamableReplicaSetMonitor::getHostOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
- return getHostsOrRefresh(criteria, maxWait)
+ const ReadPreferenceSetting& criteria, const CancelationToken& cancelToken) {
+
+ return getHostsOrRefresh(criteria, cancelToken)
.thenRunOn(_executor)
.then([self = shared_from_this()](const std::vector<HostAndPort>& result) {
invariant(result.size());
@@ -263,14 +301,15 @@ std::vector<HostAndPort> StreamableReplicaSetMonitor::_extractHosts(
}
SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
+ const ReadPreferenceSetting& criteria, const CancelationToken& cancelToken) {
// In the fast case (stable topology), we avoid mutex acquisition.
if (_isDropped.load()) {
- return _makeReplicaSetMonitorRemovedError();
+ return makeReplicaSetMonitorRemovedError(getName());
}
// start counting from the beginning of the operation
- const auto deadline = _executor->now() + ((maxWait > kZeroMs) ? maxWait : kZeroMs);
+ const auto deadline = _executor->now() +
+ duration_cast<Milliseconds>(ReplicaSetMonitorInterface::kDefaultFindHostTimeout);
// try to satisfy query immediately
auto immediateResult = _getHosts(criteria);
@@ -286,21 +325,21 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefr
"replicaSet"_attr = getName(),
"readPref"_attr = readPrefToStringFull(criteria));
- // fail fast on timeout
+ // Fail fast on timeout or cancelation.
const Date_t& now = _executor->now();
- if (deadline <= now) {
- return _makeUnsatisfiedReadPrefError(criteria);
+ if (deadline <= now || cancelToken.isCanceled()) {
+ return makeUnsatisfiedReadPrefError(getName(), criteria);
}
return _topologyManager->executeWithLock(
- [this, criteria, deadline](const TopologyDescriptionPtr& topologyDescription)
+ [this, criteria, cancelToken, deadline](const TopologyDescriptionPtr& topologyDescription)
-> SemiFuture<std::vector<HostAndPort>> {
stdx::lock_guard lk(_mutex);
// We check if we are closed under the mutex here since someone could have called
// close() concurrently with the code above.
if (_isDropped.load()) {
- return _makeReplicaSetMonitorRemovedError();
+ return makeReplicaSetMonitorRemovedError(getName());
}
// try to satisfy the query again while holding both the StreamableRSM mutex and
// TopologyManager mutex to avoid missing any topology change that has occurred
@@ -310,64 +349,65 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefr
return {*immediateResult};
}
- return _enqueueOutstandingQuery(lk, criteria, deadline);
+ return _enqueueOutstandingQuery(lk, criteria, cancelToken, deadline);
});
}
SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutstandingQuery(
- WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline) {
- using HostAndPortList = std::vector<HostAndPort>;
- Future<HostAndPortList> result;
+ WithLock,
+ const ReadPreferenceSetting& criteria,
+ const CancelationToken& cancelToken,
+ const Date_t& deadline) {
auto query = std::make_shared<HostQuery>();
query->criteria = criteria;
- query->deadline = deadline;
- auto pf = makePromiseFuture<HostAndPortList>();
+ auto pf = makePromiseFuture<std::vector<HostAndPort>>();
query->promise = std::move(pf.promise);
- auto deadlineCb =
- [this, query, self = shared_from_this()](const TaskExecutor::CallbackArgs& cbArgs) {
- stdx::lock_guard lock(_mutex);
- if (query->done) {
- return;
- }
+ // Make the deadline task cancelable for when the query is satisfied or when the input
+ // cancelToken is canceled.
+ query->deadlineCancelSource = CancelationSource(cancelToken);
+ query->start = _executor->now();
- const auto cbStatus = cbArgs.status;
- if (!cbStatus.isOK()) {
- query->promise.setError(cbStatus);
- query->done = true;
- return;
- }
-
- const auto errorStatus = _makeUnsatisfiedReadPrefError(query->criteria);
- query->promise.setError(errorStatus);
- query->done = true;
- LOGV2_INFO(4333208,
- "RSM {replicaSet} host selection timeout: {error}",
- "RSM host selection timeout",
- "replicaSet"_attr = getName(),
- "error"_attr = errorStatus.toString());
- };
- auto swDeadlineHandle = _executor->scheduleWorkAt(query->deadline, deadlineCb);
-
- if (!swDeadlineHandle.isOK()) {
- LOGV2_INFO(4333207,
- "RSM {replicaSet} error scheduling deadline handler: {error}",
- "RSM error scheduling deadline handler",
- "replicaSet"_attr = getName(),
- "error"_attr = swDeadlineHandle.getStatus());
- return SemiFuture<HostAndPortList>::makeReady(swDeadlineHandle.getStatus());
- }
- query->deadlineHandle = swDeadlineHandle.getValue();
- _outstandingQueries.push_back(query);
+ // Add the query to the list of outstanding queries.
+ auto queryIter = _outstandingQueries.insert(_outstandingQueries.end(), query);
// Send topology changes to the query processor to satisfy the future.
// It will be removed as a listener when all waiting queries have been satisfied.
_eventsPublisher->registerListener(_queryProcessor);
+ // After a deadline or when the input cancelation token is canceled, cancel this query. If the
+ // query completes first, the deadlineCancelSource will be used to cancel this task.
+ _executor->sleepUntil(deadline, query->deadlineCancelSource.token())
+ .getAsync([this, query, queryIter, self = shared_from_this(), cancelToken](Status status) {
+ // If the deadline was reached or cancelation occurred on the input cancelation token,
+ // mark the query as canceled. Otherwise, the deadlineCancelSource must have been
+ // canceled due to the query completing successfully.
+ if (status.isOK() || cancelToken.isCanceled()) {
+ auto errorStatus = makeUnsatisfiedReadPrefError(self->getName(), query->criteria);
+ // Mark query as done, and if it wasn't already done, remove it from the list of
+ // outstanding queries.
+ if (query->tryCancel(errorStatus)) {
+ LOGV2_INFO(4333208,
+ "RSM {replicaSet} host selection timeout: {error}",
+ "RSM host selection timeout",
+ "replicaSet"_attr = self->getName(),
+ "error"_attr = errorStatus.toString());
+
+ stdx::lock_guard lk(_mutex);
+ // Check that the RSM hasn't been dropped (and _outstandingQueries has not
+ // been cleared) before erasing.
+ if (!_isDropped.load()) {
+ invariant(_outstandingQueries.size() > 0);
+ _eraseQueryFromOutstandingQueries(lk, queryIter);
+ }
+ }
+ }
+ });
+
return std::move(pf.future).semi();
-} // namespace mongo
+}
boost::optional<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_getHosts(
const TopologyDescriptionPtr& topology, const ReadPreferenceSetting& criteria) {
@@ -383,7 +423,7 @@ boost::optional<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_getHosts
}
HostAndPort StreamableReplicaSetMonitor::getPrimaryOrUassert() {
- return getHostOrRefresh(kPrimaryOnlyReadPreference).get();
+ return getHostOrRefresh(kPrimaryOnlyReadPreference, CancelationToken::uncancelable()).get();
}
sdam::TopologyEventsPublisherPtr StreamableReplicaSetMonitor::getEventsPublisher() {
@@ -599,7 +639,7 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent(
return;
// Notify external components if there are membership changes in the topology.
- if (_hasMembershipChange(previousDescription, newDescription)) {
+ if (hasMembershipChange(previousDescription, newDescription)) {
LOGV2(4333213,
"RSM {replicaSet} Topology Change: {newTopologyDescription}",
"RSM Topology Change",
@@ -693,39 +733,22 @@ std::string StreamableReplicaSetMonitor::_logPrefix() {
void StreamableReplicaSetMonitor::_failOutstandingWithStatus(WithLock, Status status) {
for (const auto& query : _outstandingQueries) {
- if (query->done)
- continue;
-
- query->done = true;
- _executor->cancel(query->deadlineHandle);
- query->promise.setError(status);
+ (void)query->tryCancel(status);
}
_outstandingQueries.clear();
}
-bool StreamableReplicaSetMonitor::_hasMembershipChange(
- sdam::TopologyDescriptionPtr oldDescription, sdam::TopologyDescriptionPtr newDescription) {
-
- if (oldDescription->getServers().size() != newDescription->getServers().size())
- return true;
-
- for (const auto& server : oldDescription->getServers()) {
- const auto newServer = newDescription->findServerByAddress(server->getAddress());
- if (!newServer)
- return true;
- const ServerDescription& s = *server;
- const ServerDescription& ns = **newServer;
- if (s != ns)
- return true;
- }
+std::list<StreamableReplicaSetMonitor::HostQueryPtr>::iterator
+StreamableReplicaSetMonitor::_eraseQueryFromOutstandingQueries(
+ WithLock, std::list<HostQueryPtr>::iterator iter) {
- for (const auto& server : newDescription->getServers()) {
- auto oldServer = oldDescription->findServerByAddress(server->getAddress());
- if (!oldServer)
- return true;
+ auto retVal = _outstandingQueries.erase(iter);
+ if (_outstandingQueries.size() == 0) {
+ // If there are no more outstanding queries, no need to listen for topology
+ // changes in this monitor.
+ _eventsPublisher->removeListener(_queryProcessor);
}
-
- return false;
+ return retVal;
}
void StreamableReplicaSetMonitor::_processOutstanding(
@@ -737,57 +760,52 @@ void StreamableReplicaSetMonitor::_processOutstanding(
stdx::lock_guard lock(_mutex);
- bool shouldRemove;
auto it = _outstandingQueries.begin();
+ bool hadUnresolvedQuery{false};
+
+ // Iterate through the outstanding queries and try to resolve them via calls to _getHosts. If we
+ // succeed in resolving a query, the query is removed from the list. If a query has already been
+ // canceled, or there are no results, it will be skipped. Cancelation logic elsewhere will
+ // handle removing the canceled queries from the list.
while (it != _outstandingQueries.end()) {
auto& query = *it;
- shouldRemove = false;
- if (query->done) {
- shouldRemove = true;
- } else {
+ // If query has not been canceled yet, try to satisfy it.
+ if (!query->hasBeenResolved()) {
auto result = _getHosts(topologyDescription, query->criteria);
if (result) {
- _executor->cancel(query->deadlineHandle);
- query->done = true;
- query->promise.emplaceValue(std::move(*result));
- const auto latency = _executor->now() - query->start;
- LOGV2_DEBUG(433214,
- kLowerLogLevel,
- "RSM {replicaSet} finished async getHosts: {readPref} ({duration})",
- "RSM finished async getHosts",
- "replicaSet"_attr = getName(),
- "readPref"_attr = readPrefToStringFull(query->criteria),
- "duration"_attr = Milliseconds(latency));
- shouldRemove = true;
+ if (query->tryResolveWithSuccess(std::move(*result))) {
+ const auto latency = _executor->now() - query->start;
+ LOGV2_DEBUG(433214,
+ kLowerLogLevel,
+ "RSM {replicaSet} finished async getHosts: {readPref} ({duration})",
+ "RSM finished async getHosts",
+ "replicaSet"_attr = getName(),
+ "readPref"_attr = readPrefToStringFull(query->criteria),
+ "duration"_attr = Milliseconds(latency));
+
+ it = _eraseQueryFromOutstandingQueries(lock, it);
+ } else {
+ // The query was canceled, so skip to the next entry without erasing it.
+ ++it;
+ }
+ } else {
+ // Results were not available, so skip to the next entry without erasing it.
+ ++it;
+ hadUnresolvedQuery = true;
}
+ } else {
+ // The query was canceled, so skip to the next entry without erasing it.
+ ++it;
}
-
- it = (shouldRemove) ? _outstandingQueries.erase(it) : ++it;
}
- if (_outstandingQueries.size()) {
- // enable expedited mode
+ // If there remain unresolved queries, enable expedited mode.
+ if (hadUnresolvedQuery) {
_serverDiscoveryMonitor->requestImmediateCheck();
- } else {
- // if no more outstanding queries, no need to listen for topology changes in
- // this monitor.
- _eventsPublisher->removeListener(_queryProcessor);
}
}
-Status StreamableReplicaSetMonitor::_makeUnsatisfiedReadPrefError(
- const ReadPreferenceSetting& criteria) const {
- return Status(ErrorCodes::FailedToSatisfyReadPreference,
- str::stream() << "Could not find host matching read preference "
- << criteria.toString() << " for set " << getName());
-}
-
-Status StreamableReplicaSetMonitor::_makeReplicaSetMonitorRemovedError() const {
- return Status(ErrorCodes::ReplicaSetMonitorRemoved,
- str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed");
-}
-
void StreamableReplicaSetMonitor::runScanForMockReplicaSet() {
MONGO_UNREACHABLE;
}