summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2019-06-06 13:49:44 -0400
committerJason Carey <jcarey@argv.me>2019-06-07 15:54:40 -0400
commit4fc54eade0028a9976f6c2556fa7c169f66f2ba0 (patch)
tree050c7009729d3c68810b993010ad9dc0dc0526f5
parent13f72f733b6d71f78155f77b4b84a559a2627ff5 (diff)
downloadmongo-4fc54eade0028a9976f6c2556fa7c169f66f2ba0.tar.gz
SERVER-41132 Opportunistic targeting for ARS
Inside the ARS, support use of the TaskExecutor::scheduleRemoteCommandOnAny method, which will allow the ARS to use any acceptable host returned from targeting, rather than requiring us to use one at random. This should allow us to prefer routing requests to hosts which have ready connections to, or that we can generate ready connections to faster (cherry picked from commit 8eed2fe5376ff6843ab1fe7881c8377812215185)
-rw-r--r--src/mongo/s/async_requests_sender.cpp84
-rw-r--r--src/mongo/s/async_requests_sender.h18
2 files changed, 66 insertions, 36 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 7b7a14c1cd3..86fa3af6300 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -33,6 +33,8 @@
#include "mongo/s/async_requests_sender.h"
+#include <fmt/format.h>
+
#include "mongo/client/remote_command_targeter.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -45,6 +47,8 @@
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
+using namespace fmt::literals;
+
namespace mongo {
namespace {
@@ -140,21 +144,31 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() {
void AsyncRequestsSender::RemoteData::executeRequest() {
scheduleRequest()
.thenRunOn(*_ars->_subBaton)
- .getAsync([this](StatusWith<executor::RemoteCommandResponse> rcr) {
+ .getAsync([this](StatusWith<RemoteCommandOnAnyCallbackArgs> rcr) {
_done = true;
- _ars->_responseQueue.push({std::move(_shardId), rcr, std::move(_shardHostAndPort)});
+ if (rcr.isOK()) {
+ _ars->_responseQueue.push(
+ {std::move(_shardId), rcr.getValue().response, std::move(_shardHostAndPort)});
+ } else {
+ _ars->_responseQueue.push(
+ {std::move(_shardId), rcr.getStatus(), std::move(_shardHostAndPort)});
+ }
});
}
-SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::scheduleRequest() {
- return resolveShardIdToHostAndPort(_ars->_readPreference)
+auto AsyncRequestsSender::RemoteData::scheduleRequest()
+ -> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
+ return resolveShardIdToHostAndPorts(_ars->_readPreference)
.thenRunOn(*_ars->_subBaton)
- .then([this](auto&& hostAndPort) { return scheduleRemoteCommand(std::move(hostAndPort)); })
+ .then([this](auto&& hostAndPorts) {
+ _shardHostAndPort.emplace(hostAndPorts.front());
+ return scheduleRemoteCommand(std::move(hostAndPorts));
+ })
.then([this](auto&& rcr) { return handleResponse(std::move(rcr)); })
.semi();
}
-SemiFuture<HostAndPort> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
+SemiFuture<std::vector<HostAndPort>> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPorts(
const ReadPreferenceSetting& readPref) {
const auto shard = getShard();
if (!shard) {
@@ -162,45 +176,45 @@ SemiFuture<HostAndPort> AsyncRequestsSender::RemoteData::resolveShardIdToHostAnd
str::stream() << "Could not find shard " << _shardId);
}
- return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20));
+ return shard->getTargeter()->findHostsWithMaxWait(readPref, Seconds(20));
}
-SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::scheduleRemoteCommand(
- HostAndPort&& hostAndPort) {
- _shardHostAndPort = std::move(hostAndPort);
-
- executor::RemoteCommandRequest request(
- *_shardHostAndPort, _ars->_db, _cmdObj, _ars->_metadataObj, _ars->_opCtx);
+auto AsyncRequestsSender::RemoteData::scheduleRemoteCommand(std::vector<HostAndPort>&& hostAndPorts)
+ -> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
+ executor::RemoteCommandRequestOnAny request(
+ std::move(hostAndPorts), _ars->_db, _cmdObj, _ars->_metadataObj, _ars->_opCtx);
// We have to make a promise future pair because the TaskExecutor doesn't currently support a
// future returning variant of scheduleRemoteCommand
- auto[p, f] = makePromiseFuture<executor::RemoteCommandResponse>();
+ auto[p, f] = makePromiseFuture<RemoteCommandOnAnyCallbackArgs>();
// Failures to schedule skip the retry loop
- uassertStatusOK(_ars->_subExecutor->scheduleRemoteCommand(
+ uassertStatusOK(_ars->_subExecutor->scheduleRemoteCommandOnAny(
request,
// We have to make a shared_ptr<Promise> here because scheduleRemoteCommand requires
// copyable callbacks
- [p = std::make_shared<Promise<executor::RemoteCommandResponse>>(std::move(p))](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- p->emplaceValue(cbData.response);
- },
+ [p = std::make_shared<Promise<RemoteCommandOnAnyCallbackArgs>>(std::move(p))](
+ const RemoteCommandOnAnyCallbackArgs& cbData) { p->emplaceValue(cbData); },
*_ars->_subBaton));
return std::move(f).semi();
}
-SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::handleResponse(
- executor::RemoteCommandResponse&& rcr) {
- auto status = rcr.status;
+auto AsyncRequestsSender::RemoteData::handleResponse(RemoteCommandOnAnyCallbackArgs&& rcr)
+ -> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
+ if (rcr.response.target) {
+ _shardHostAndPort = rcr.response.target;
+ }
+
+ auto status = rcr.response.status;
if (status.isOK()) {
- status = getStatusFromCommandResult(rcr.data);
+ status = getStatusFromCommandResult(rcr.response.data);
}
if (status.isOK()) {
- status = getWriteConcernStatusFromCommandResult(rcr.data);
+ status = getWriteConcernStatusFromCommandResult(rcr.response.data);
}
// If we're okay (RemoteCommandResponse, command result and write concern)-wise we're done.
@@ -214,14 +228,26 @@ SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::han
if (!shard) {
uasserted(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << _shardId);
} else {
- if (_shardHostAndPort) {
- shard->updateReplSetMonitor(*_shardHostAndPort, status);
+ std::vector<HostAndPort> failedTargets;
+
+ if (rcr.response.target) {
+ failedTargets = {*rcr.response.target};
+ } else {
+ failedTargets = rcr.request.target;
}
+
+ shard->updateReplSetMonitor(failedTargets.front(), status);
+
if (!_ars->_stopRetrying && shard->isRetriableError(status.code(), _ars->_retryPolicy) &&
_retryCount < kMaxNumFailedHostRetryAttempts) {
- LOG(1) << "Command to remote " << _shardId << " at host " << *_shardHostAndPort
- << " failed with retriable error and will be retried "
+
+ LOG(1) << "Command to remote " << _shardId
+ << (failedTargets.empty() ? " " : (failedTargets.size() > 1 ? " for hosts "
+ : " at host "))
+ << "{}"_format(fmt::join(failedTargets, ", "))
+ << "failed with retriable error and will be retried "
<< causedBy(redact(status));
+
++_retryCount;
_shardHostAndPort.reset();
// retry through recursion
@@ -230,7 +256,7 @@ SemiFuture<executor::RemoteCommandResponse> AsyncRequestsSender::RemoteData::han
}
// Status' in the response.status field that aren't retried get converted to top level errors
- uassertStatusOK(rcr.status);
+ uassertStatusOK(rcr.response.status);
// We're not okay (on the remote), but still not going to retry
return std::move(rcr);
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 86387d01bbe..95c457e2e7f 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -168,6 +168,9 @@ private:
*/
class RemoteData {
public:
+ using RemoteCommandOnAnyCallbackArgs =
+ executor::TaskExecutor::RemoteCommandOnAnyCallbackArgs;
+
/**
* Creates a new uninitialized remote state with a command to send.
*/
@@ -210,24 +213,25 @@ private:
*
* for the given shard.
*/
- SemiFuture<executor::RemoteCommandResponse> scheduleRequest();
+ SemiFuture<RemoteCommandOnAnyCallbackArgs> scheduleRequest();
/**
- * Given a read preference, selects a host on which the command should be run.
+ * Given a read preference, selects a lists of hosts on which the command can run.
*/
- SemiFuture<HostAndPort> resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref);
+ SemiFuture<std::vector<HostAndPort>> resolveShardIdToHostAndPorts(
+ const ReadPreferenceSetting& readPref);
/**
* Schedules the remote command on the ARS's TaskExecutor
*/
- SemiFuture<executor::RemoteCommandResponse> scheduleRemoteCommand(
- HostAndPort&& hostAndPort);
+ SemiFuture<RemoteCommandOnAnyCallbackArgs> scheduleRemoteCommand(
+ std::vector<HostAndPort>&& hostAndPort);
/**
* Handles the remote response
*/
- SemiFuture<executor::RemoteCommandResponse> handleResponse(
- executor::RemoteCommandResponse&& rcr);
+ SemiFuture<RemoteCommandOnAnyCallbackArgs> handleResponse(
+ RemoteCommandOnAnyCallbackArgs&& rcr);
private:
bool _done = false;