diff options
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 84 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 18 |
2 files changed, 66 insertions, 36 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 7b7a14c1cd3..86fa3af6300 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -33,6 +33,8 @@ #include "mongo/s/async_requests_sender.h" +#include <fmt/format.h> + #include "mongo/client/remote_command_targeter.h" #include "mongo/executor/remote_command_request.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -45,6 +47,8 @@ #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" +using namespace fmt::literals; + namespace mongo { namespace { @@ -140,21 +144,31 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() { void AsyncRequestsSender::RemoteData::executeRequest() { scheduleRequest() .thenRunOn(*_ars->_subBaton) - .getAsync([this](StatusWith<executor::RemoteCommandResponse> rcr) { + .getAsync([this](StatusWith<RemoteCommandOnAnyCallbackArgs> rcr) { _done = true; - _ars->_responseQueue.push({std::move(_shardId), rcr, std::move(_shardHostAndPort)}); + if (rcr.isOK()) { + _ars->_responseQueue.push( + {std::move(_shardId), rcr.getValue().response, std::move(_shardHostAndPort)}); + } else { + _ars->_responseQueue.push( + {std::move(_shardId), rcr.getStatus(), std::move(_shardHostAndPort)}); + } }); } -SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::scheduleRequest() { - return resolveShardIdToHostAndPort(_ars->_readPreference) +auto AsyncRequestsSender::RemoteData::scheduleRequest() + -> SemiFuture<RemoteCommandOnAnyCallbackArgs> { + return resolveShardIdToHostAndPorts(_ars->_readPreference) .thenRunOn(*_ars->_subBaton) - .then([this](auto&& hostAndPort) { return scheduleRemoteCommand(std::move(hostAndPort)); }) + .then([this](auto&& hostAndPorts) { + _shardHostAndPort.emplace(hostAndPorts.front()); + return scheduleRemoteCommand(std::move(hostAndPorts)); + }) .then([this](auto&& rcr) { return handleResponse(std::move(rcr)); }) .semi(); } -SemiFuture<HostAndPort> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( +SemiFuture<std::vector<HostAndPort>> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPorts( const ReadPreferenceSetting& readPref) { const auto shard = getShard(); if (!shard) { @@ -162,45 +176,45 @@ SemiFuture<HostAndPort> AsyncRequestsSender::RemoteData::resolveShardIdToHostAnd str::stream() << "Could not find shard " << _shardId); } - return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20)); + return shard->getTargeter()->findHostsWithMaxWait(readPref, Seconds(20)); } -SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::scheduleRemoteCommand( - HostAndPort&& hostAndPort) { - _shardHostAndPort = std::move(hostAndPort); - - executor::RemoteCommandRequest request( - *_shardHostAndPort, _ars->_db, _cmdObj, _ars->_metadataObj, _ars->_opCtx); +auto AsyncRequestsSender::RemoteData::scheduleRemoteCommand(std::vector<HostAndPort>&& hostAndPorts) + -> SemiFuture<RemoteCommandOnAnyCallbackArgs> { + executor::RemoteCommandRequestOnAny request( + std::move(hostAndPorts), _ars->_db, _cmdObj, _ars->_metadataObj, _ars->_opCtx); // We have to make a promise future pair because the TaskExecutor doesn't currently support a // future returning variant of scheduleRemoteCommand - auto[p, f] = makePromiseFuture<executor::RemoteCommandResponse>(); + auto[p, f] = makePromiseFuture<RemoteCommandOnAnyCallbackArgs>(); // Failures to schedule skip the retry loop - uassertStatusOK(_ars->_subExecutor->scheduleRemoteCommand( + uassertStatusOK(_ars->_subExecutor->scheduleRemoteCommandOnAny( request, // We have to make a shared_ptr<Promise> here because scheduleRemoteCommand requires // copyable callbacks - [p = std::make_shared<Promise<executor::RemoteCommandResponse>>(std::move(p))]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - p->emplaceValue(cbData.response); - }, + [p = std::make_shared<Promise<RemoteCommandOnAnyCallbackArgs>>(std::move(p))]( + const RemoteCommandOnAnyCallbackArgs& cbData) { p->emplaceValue(cbData); }, *_ars->_subBaton)); return std::move(f).semi(); } -SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::handleResponse( - executor::RemoteCommandResponse&& rcr) { - auto status = rcr.status; +auto AsyncRequestsSender::RemoteData::handleResponse(RemoteCommandOnAnyCallbackArgs&& rcr) + -> SemiFuture<RemoteCommandOnAnyCallbackArgs> { + if (rcr.response.target) { + _shardHostAndPort = rcr.response.target; + } + + auto status = rcr.response.status; if (status.isOK()) { - status = getStatusFromCommandResult(rcr.data); + status = getStatusFromCommandResult(rcr.response.data); } if (status.isOK()) { - status = getWriteConcernStatusFromCommandResult(rcr.data); + status = getWriteConcernStatusFromCommandResult(rcr.response.data); } // If we're okay (RemoteCommandResponse, command result and write concern)-wise we're done. @@ -214,14 +228,26 @@ SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::han if (!shard) { uasserted(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << _shardId); } else { - if (_shardHostAndPort) { - shard->updateReplSetMonitor(*_shardHostAndPort, status); + std::vector<HostAndPort> failedTargets; + + if (rcr.response.target) { + failedTargets = {*rcr.response.target}; + } else { + failedTargets = rcr.request.target; } + + shard->updateReplSetMonitor(failedTargets.front(), status); + if (!_ars->_stopRetrying && shard->isRetriableError(status.code(), _ars->_retryPolicy) && _retryCount < kMaxNumFailedHostRetryAttempts) { - LOG(1) << "Command to remote " << _shardId << " at host " << *_shardHostAndPort - << " failed with retriable error and will be retried " + + LOG(1) << "Command to remote " << _shardId + << (failedTargets.empty() ? " " : (failedTargets.size() > 1 ? " for hosts " + : " at host ")) + << "{}"_format(fmt::join(failedTargets, ", ")) + << "failed with retriable error and will be retried " << causedBy(redact(status)); + ++_retryCount; _shardHostAndPort.reset(); // retry through recursion @@ -230,7 +256,7 @@ SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::han } // Status' in the response.status field that aren't retried get converted to top level errors - uassertStatusOK(rcr.status); + uassertStatusOK(rcr.response.status); // We're not okay (on the remote), but still not going to retry return std::move(rcr); diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 86387d01bbe..95c457e2e7f 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -168,6 +168,9 @@ private: */ class RemoteData { public: + using RemoteCommandOnAnyCallbackArgs = + executor::TaskExecutor::RemoteCommandOnAnyCallbackArgs; + /** * Creates a new uninitialized remote state with a command to send. */ @@ -210,24 +213,25 @@ private: * * for the given shard. */ - SemiFuture<executor::RemoteCommandResponse> scheduleRequest(); + SemiFuture<RemoteCommandOnAnyCallbackArgs> scheduleRequest(); /** - * Given a read preference, selects a host on which the command should be run. + * Given a read preference, selects a lists of hosts on which the command can run. */ - SemiFuture<HostAndPort> resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref); + SemiFuture<std::vector<HostAndPort>> resolveShardIdToHostAndPorts( + const ReadPreferenceSetting& readPref); /** * Schedules the remote command on the ARS's TaskExecutor */ - SemiFuture<executor::RemoteCommandResponse> scheduleRemoteCommand( - HostAndPort&& hostAndPort); + SemiFuture<RemoteCommandOnAnyCallbackArgs> scheduleRemoteCommand( + std::vector<HostAndPort>&& hostAndPort); /** * Handles the remote response */ - SemiFuture<executor::RemoteCommandResponse> handleResponse( - executor::RemoteCommandResponse&& rcr); + SemiFuture<RemoteCommandOnAnyCallbackArgs> handleResponse( + RemoteCommandOnAnyCallbackArgs&& rcr); private: bool _done = false; |