summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/s/async_requests_sender.cpp84
-rw-r--r--src/mongo/s/async_requests_sender.h18
2 files changed, 66 insertions, 36 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 7b7a14c1cd3..86fa3af6300 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -33,6 +33,8 @@
#include "mongo/s/async_requests_sender.h"
+#include <fmt/format.h>
+
#include "mongo/client/remote_command_targeter.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -45,6 +47,8 @@
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
+using namespace fmt::literals;
+
namespace mongo {
namespace {
@@ -140,21 +144,31 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() {
void AsyncRequestsSender::RemoteData::executeRequest() {
scheduleRequest()
.thenRunOn(*_ars->_subBaton)
- .getAsync([this](StatusWith<executor::RemoteCommandResponse> rcr) {
+ .getAsync([this](StatusWith<RemoteCommandOnAnyCallbackArgs> rcr) {
_done = true;
- _ars->_responseQueue.push({std::move(_shardId), rcr, std::move(_shardHostAndPort)});
+ if (rcr.isOK()) {
+ _ars->_responseQueue.push(
+ {std::move(_shardId), rcr.getValue().response, std::move(_shardHostAndPort)});
+ } else {
+ _ars->_responseQueue.push(
+ {std::move(_shardId), rcr.getStatus(), std::move(_shardHostAndPort)});
+ }
});
}
-SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::scheduleRequest() {
- return resolveShardIdToHostAndPort(_ars->_readPreference)
+auto AsyncRequestsSender::RemoteData::scheduleRequest()
+ -> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
+ return resolveShardIdToHostAndPorts(_ars->_readPreference)
.thenRunOn(*_ars->_subBaton)
- .then([this](auto&& hostAndPort) { return scheduleRemoteCommand(std::move(hostAndPort)); })
+ .then([this](auto&& hostAndPorts) {
+ _shardHostAndPort.emplace(hostAndPorts.front());
+ return scheduleRemoteCommand(std::move(hostAndPorts));
+ })
.then([this](auto&& rcr) { return handleResponse(std::move(rcr)); })
.semi();
}
-SemiFuture<HostAndPort> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
+SemiFuture<std::vector<HostAndPort>> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPorts(
const ReadPreferenceSetting& readPref) {
const auto shard = getShard();
if (!shard) {
@@ -162,45 +176,45 @@ SemiFuture<HostAndPort> AsyncRequestsSender::RemoteData::resolveShardIdToHostAnd
str::stream() << "Could not find shard " << _shardId);
}
- return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20));
+ return shard->getTargeter()->findHostsWithMaxWait(readPref, Seconds(20));
}
-SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::scheduleRemoteCommand(
- HostAndPort&& hostAndPort) {
- _shardHostAndPort = std::move(hostAndPort);
-
- executor::RemoteCommandRequest request(
- *_shardHostAndPort, _ars->_db, _cmdObj, _ars->_metadataObj, _ars->_opCtx);
+auto AsyncRequestsSender::RemoteData::scheduleRemoteCommand(std::vector<HostAndPort>&& hostAndPorts)
+ -> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
+ executor::RemoteCommandRequestOnAny request(
+ std::move(hostAndPorts), _ars->_db, _cmdObj, _ars->_metadataObj, _ars->_opCtx);
// We have to make a promise future pair because the TaskExecutor doesn't currently support a
// future returning variant of scheduleRemoteCommand
- auto[p, f] = makePromiseFuture<executor::RemoteCommandResponse>();
+ auto[p, f] = makePromiseFuture<RemoteCommandOnAnyCallbackArgs>();
// Failures to schedule skip the retry loop
- uassertStatusOK(_ars->_subExecutor->scheduleRemoteCommand(
+ uassertStatusOK(_ars->_subExecutor->scheduleRemoteCommandOnAny(
request,
// We have to make a shared_ptr<Promise> here because scheduleRemoteCommand requires
// copyable callbacks
- [p = std::make_shared<Promise<executor::RemoteCommandResponse>>(std::move(p))](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- p->emplaceValue(cbData.response);
- },
+ [p = std::make_shared<Promise<RemoteCommandOnAnyCallbackArgs>>(std::move(p))](
+ const RemoteCommandOnAnyCallbackArgs& cbData) { p->emplaceValue(cbData); },
*_ars->_subBaton));
return std::move(f).semi();
}
-SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::handleResponse(
- executor::RemoteCommandResponse&& rcr) {
- auto status = rcr.status;
+auto AsyncRequestsSender::RemoteData::handleResponse(RemoteCommandOnAnyCallbackArgs&& rcr)
+ -> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
+ if (rcr.response.target) {
+ _shardHostAndPort = rcr.response.target;
+ }
+
+ auto status = rcr.response.status;
if (status.isOK()) {
- status = getStatusFromCommandResult(rcr.data);
+ status = getStatusFromCommandResult(rcr.response.data);
}
if (status.isOK()) {
- status = getWriteConcernStatusFromCommandResult(rcr.data);
+ status = getWriteConcernStatusFromCommandResult(rcr.response.data);
}
// If we're okay (RemoteCommandResponse, command result and write concern)-wise we're done.
@@ -214,14 +228,26 @@ SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::han
if (!shard) {
uasserted(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << _shardId);
} else {
- if (_shardHostAndPort) {
- shard->updateReplSetMonitor(*_shardHostAndPort, status);
+ std::vector<HostAndPort> failedTargets;
+
+ if (rcr.response.target) {
+ failedTargets = {*rcr.response.target};
+ } else {
+ failedTargets = rcr.request.target;
}
+
+ shard->updateReplSetMonitor(failedTargets.front(), status);
+
if (!_ars->_stopRetrying && shard->isRetriableError(status.code(), _ars->_retryPolicy) &&
_retryCount < kMaxNumFailedHostRetryAttempts) {
- LOG(1) << "Command to remote " << _shardId << " at host " << *_shardHostAndPort
- << " failed with retriable error and will be retried "
+
+ LOG(1) << "Command to remote " << _shardId
+ << (failedTargets.empty() ? " " : (failedTargets.size() > 1 ? " for hosts "
+ : " at host "))
+ << "{}"_format(fmt::join(failedTargets, ", "))
+ << "failed with retriable error and will be retried "
<< causedBy(redact(status));
+
++_retryCount;
_shardHostAndPort.reset();
// retry through recursion
@@ -230,7 +256,7 @@ SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::han
}
// Status' in the response.status field that aren't retried get converted to top level errors
- uassertStatusOK(rcr.status);
+ uassertStatusOK(rcr.response.status);
// We're not okay (on the remote), but still not going to retry
return std::move(rcr);
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 86387d01bbe..95c457e2e7f 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -168,6 +168,9 @@ private:
*/
class RemoteData {
public:
+ using RemoteCommandOnAnyCallbackArgs =
+ executor::TaskExecutor::RemoteCommandOnAnyCallbackArgs;
+
/**
* Creates a new uninitialized remote state with a command to send.
*/
@@ -210,24 +213,25 @@ private:
*
* for the given shard.
*/
- SemiFuture<executor::RemoteCommandResponse> scheduleRequest();
+ SemiFuture<RemoteCommandOnAnyCallbackArgs> scheduleRequest();
/**
- * Given a read preference, selects a host on which the command should be run.
+ * Given a read preference, selects a lists of hosts on which the command can run.
*/
- SemiFuture<HostAndPort> resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref);
+ SemiFuture<std::vector<HostAndPort>> resolveShardIdToHostAndPorts(
+ const ReadPreferenceSetting& readPref);
/**
* Schedules the remote command on the ARS's TaskExecutor
*/
- SemiFuture<executor::RemoteCommandResponse> scheduleRemoteCommand(
- HostAndPort&& hostAndPort);
+ SemiFuture<RemoteCommandOnAnyCallbackArgs> scheduleRemoteCommand(
+ std::vector<HostAndPort>&& hostAndPort);
/**
* Handles the remote response
*/
- SemiFuture<executor::RemoteCommandResponse> handleResponse(
- executor::RemoteCommandResponse&& rcr);
+ SemiFuture<RemoteCommandOnAnyCallbackArgs> handleResponse(
+ RemoteCommandOnAnyCallbackArgs&& rcr);
private:
bool _done = false;