diff options
author | Jason Carey <jcarey@argv.me> | 2019-06-06 13:49:44 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2019-06-07 15:54:40 -0400 |
commit | 4fc54eade0028a9976f6c2556fa7c169f66f2ba0 (patch) | |
tree | 050c7009729d3c68810b993010ad9dc0dc0526f5 | |
parent | 13f72f733b6d71f78155f77b4b84a559a2627ff5 (diff) | |
download | mongo-4fc54eade0028a9976f6c2556fa7c169f66f2ba0.tar.gz |
SERVER-41132 Opportunistic targeting for ARS
Inside the ARS, support use of the
TaskExecutor::scheduleRemoteCommandOnAny method, which will allow the
ARS to use any acceptable host returned from targeting, rather than
requiring us to use one at random.
This should allow us to prefer routing requests to hosts which have
ready connections to, or that we can generate ready connections to
faster
(cherry picked from commit 8eed2fe5376ff6843ab1fe7881c8377812215185)
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 84 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 18 |
2 files changed, 66 insertions, 36 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 7b7a14c1cd3..86fa3af6300 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -33,6 +33,8 @@ #include "mongo/s/async_requests_sender.h" +#include <fmt/format.h> + #include "mongo/client/remote_command_targeter.h" #include "mongo/executor/remote_command_request.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -45,6 +47,8 @@ #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" +using namespace fmt::literals; + namespace mongo { namespace { @@ -140,21 +144,31 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() { void AsyncRequestsSender::RemoteData::executeRequest() { scheduleRequest() .thenRunOn(*_ars->_subBaton) - .getAsync([this](StatusWith<executor::RemoteCommandResponse> rcr) { + .getAsync([this](StatusWith<RemoteCommandOnAnyCallbackArgs> rcr) { _done = true; - _ars->_responseQueue.push({std::move(_shardId), rcr, std::move(_shardHostAndPort)}); + if (rcr.isOK()) { + _ars->_responseQueue.push( + {std::move(_shardId), rcr.getValue().response, std::move(_shardHostAndPort)}); + } else { + _ars->_responseQueue.push( + {std::move(_shardId), rcr.getStatus(), std::move(_shardHostAndPort)}); + } }); } -SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::scheduleRequest() { - return resolveShardIdToHostAndPort(_ars->_readPreference) +auto AsyncRequestsSender::RemoteData::scheduleRequest() + -> SemiFuture<RemoteCommandOnAnyCallbackArgs> { + return resolveShardIdToHostAndPorts(_ars->_readPreference) .thenRunOn(*_ars->_subBaton) - .then([this](auto&& hostAndPort) { return scheduleRemoteCommand(std::move(hostAndPort)); }) + .then([this](auto&& hostAndPorts) { + _shardHostAndPort.emplace(hostAndPorts.front()); + return scheduleRemoteCommand(std::move(hostAndPorts)); + }) .then([this](auto&& rcr) { return handleResponse(std::move(rcr)); }) .semi(); } -SemiFuture<HostAndPort> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( +SemiFuture<std::vector<HostAndPort>> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPorts( const ReadPreferenceSetting& readPref) { const auto shard = getShard(); if (!shard) { @@ -162,45 +176,45 @@ SemiFuture<HostAndPort> AsyncRequestsSender::RemoteData::resolveShardIdToHostAnd str::stream() << "Could not find shard " << _shardId); } - return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20)); + return shard->getTargeter()->findHostsWithMaxWait(readPref, Seconds(20)); } -SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::scheduleRemoteCommand( - HostAndPort&& hostAndPort) { - _shardHostAndPort = std::move(hostAndPort); - - executor::RemoteCommandRequest request( - *_shardHostAndPort, _ars->_db, _cmdObj, _ars->_metadataObj, _ars->_opCtx); +auto AsyncRequestsSender::RemoteData::scheduleRemoteCommand(std::vector<HostAndPort>&& hostAndPorts) + -> SemiFuture<RemoteCommandOnAnyCallbackArgs> { + executor::RemoteCommandRequestOnAny request( + std::move(hostAndPorts), _ars->_db, _cmdObj, _ars->_metadataObj, _ars->_opCtx); // We have to make a promise future pair because the TaskExecutor doesn't currently support a // future returning variant of scheduleRemoteCommand - auto[p, f] = makePromiseFuture<executor::RemoteCommandResponse>(); + auto[p, f] = makePromiseFuture<RemoteCommandOnAnyCallbackArgs>(); // Failures to schedule skip the retry loop - uassertStatusOK(_ars->_subExecutor->scheduleRemoteCommand( + uassertStatusOK(_ars->_subExecutor->scheduleRemoteCommandOnAny( request, // We have to make a shared_ptr<Promise> here because scheduleRemoteCommand requires // copyable callbacks - [p = std::make_shared<Promise<executor::RemoteCommandResponse>>(std::move(p))]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - p->emplaceValue(cbData.response); - }, + [p = std::make_shared<Promise<RemoteCommandOnAnyCallbackArgs>>(std::move(p))]( + const RemoteCommandOnAnyCallbackArgs& cbData) { p->emplaceValue(cbData); }, *_ars->_subBaton)); return std::move(f).semi(); } -SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::handleResponse( - executor::RemoteCommandResponse&& rcr) { - auto status = rcr.status; +auto AsyncRequestsSender::RemoteData::handleResponse(RemoteCommandOnAnyCallbackArgs&& rcr) + -> SemiFuture<RemoteCommandOnAnyCallbackArgs> { + if (rcr.response.target) { + _shardHostAndPort = rcr.response.target; + } + + auto status = rcr.response.status; if (status.isOK()) { - status = getStatusFromCommandResult(rcr.data); + status = getStatusFromCommandResult(rcr.response.data); } if (status.isOK()) { - status = getWriteConcernStatusFromCommandResult(rcr.data); + status = getWriteConcernStatusFromCommandResult(rcr.response.data); } // If we're okay (RemoteCommandResponse, command result and write concern)-wise we're done. @@ -214,14 +228,26 @@ SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::han if (!shard) { uasserted(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << _shardId); } else { - if (_shardHostAndPort) { - shard->updateReplSetMonitor(*_shardHostAndPort, status); + std::vector<HostAndPort> failedTargets; + + if (rcr.response.target) { + failedTargets = {*rcr.response.target}; + } else { + failedTargets = rcr.request.target; } + + shard->updateReplSetMonitor(failedTargets.front(), status); + if (!_ars->_stopRetrying && shard->isRetriableError(status.code(), _ars->_retryPolicy) && _retryCount < kMaxNumFailedHostRetryAttempts) { - LOG(1) << "Command to remote " << _shardId << " at host " << *_shardHostAndPort - << " failed with retriable error and will be retried " + + LOG(1) << "Command to remote " << _shardId + << (failedTargets.empty() ? " " : (failedTargets.size() > 1 ? " for hosts " + : " at host ")) + << "{}"_format(fmt::join(failedTargets, ", ")) + << "failed with retriable error and will be retried " << causedBy(redact(status)); + ++_retryCount; _shardHostAndPort.reset(); // retry through recursion @@ -230,7 +256,7 @@ SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::han } // Status' in the response.status field that aren't retried get converted to top level errors - uassertStatusOK(rcr.status); + uassertStatusOK(rcr.response.status); // We're not okay (on the remote), but still not going to retry return std::move(rcr); diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 86387d01bbe..95c457e2e7f 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -168,6 +168,9 @@ private: */ class RemoteData { public: + using RemoteCommandOnAnyCallbackArgs = + executor::TaskExecutor::RemoteCommandOnAnyCallbackArgs; + /** * Creates a new uninitialized remote state with a command to send. */ @@ -210,24 +213,25 @@ private: * * for the given shard. */ - SemiFuture<executor::RemoteCommandResponse> scheduleRequest(); + SemiFuture<RemoteCommandOnAnyCallbackArgs> scheduleRequest(); /** - * Given a read preference, selects a host on which the command should be run. + * Given a read preference, selects a lists of hosts on which the command can run. */ - SemiFuture<HostAndPort> resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref); + SemiFuture<std::vector<HostAndPort>> resolveShardIdToHostAndPorts( + const ReadPreferenceSetting& readPref); /** * Schedules the remote command on the ARS's TaskExecutor */ - SemiFuture<executor::RemoteCommandResponse> scheduleRemoteCommand( - HostAndPort&& hostAndPort); + SemiFuture<RemoteCommandOnAnyCallbackArgs> scheduleRemoteCommand( + std::vector<HostAndPort>&& hostAndPort); /** * Handles the remote response */ - SemiFuture<executor::RemoteCommandResponse> handleResponse( - executor::RemoteCommandResponse&& rcr); + SemiFuture<RemoteCommandOnAnyCallbackArgs> handleResponse( + RemoteCommandOnAnyCallbackArgs&& rcr); private: bool _done = false; |