summaryrefslogtreecommitdiff
path: root/src/mongo/executor/async_rpc.h
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2023-01-04 02:33:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-04 03:15:16 +0000
commit99cf1c7c97eb67ad7505140e20cab9d1c23d0b0c (patch)
tree0878db4d0339391c2966338539cdfb3300af8836 /src/mongo/executor/async_rpc.h
parent4ce757f499c7854c5029d432f5a473802bca1279 (diff)
downloadmongo-99cf1c7c97eb67ad7505140e20cab9d1c23d0b0c.tar.gz
SERVER-71230 Add baton support to async rpc senders
Diffstat (limited to 'src/mongo/executor/async_rpc.h')
-rw-r--r--src/mongo/executor/async_rpc.h52
1 files changed, 48 insertions, 4 deletions
diff --git a/src/mongo/executor/async_rpc.h b/src/mongo/executor/async_rpc.h
index 3c4c6cf05ec..102cf7e0bb0 100644
--- a/src/mongo/executor/async_rpc.h
+++ b/src/mongo/executor/async_rpc.h
@@ -86,12 +86,14 @@ struct AsyncRPCOptions {
AsyncRPCOptions(CommandType cmd,
std::shared_ptr<executor::TaskExecutor> exec,
CancellationToken token,
- std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>())
- : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy} {}
+ std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(),
+ BatonHandle baton = nullptr)
+ : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy}, baton{std::move(baton)} {}
CommandType cmd;
std::shared_ptr<executor::TaskExecutor> exec;
CancellationToken token;
std::shared_ptr<RetryPolicy> retryPolicy;
+ BatonHandle baton;
};
/**
@@ -124,7 +126,22 @@ public:
Targeter* targeter,
OperationContext* opCtx,
std::shared_ptr<TaskExecutor> exec,
- CancellationToken token) = 0;
+ CancellationToken token,
+ BatonHandle baton) = 0;
+ ExecutorFuture<AsyncRPCInternalResponse> _sendCommand(StringData dbName,
+ BSONObj cmdBSON,
+ Targeter* targeter,
+ OperationContext* opCtx,
+ std::shared_ptr<TaskExecutor> exec,
+ CancellationToken token) {
+ return _sendCommand(std::move(dbName),
+ std::move(cmdBSON),
+ std::move(targeter),
+ std::move(opCtx),
+ std::move(exec),
+ std::move(token),
+ nullptr);
+ }
static AsyncRPCRunner* get(ServiceContext* serviceContext);
static void set(ServiceContext* serviceContext, std::unique_ptr<AsyncRPCRunner> theRunner);
};
@@ -154,6 +171,31 @@ struct RetryDelayAsBackoff {
RetryPolicy* _policy;
};
+class ProxyingExecutor : public OutOfLineExecutor,
+ public std::enable_shared_from_this<ProxyingExecutor> {
+public:
+ ProxyingExecutor(BatonHandle baton, std::shared_ptr<TaskExecutor> executor)
+ : _baton{std::move(baton)}, _executor{std::move(executor)} {}
+
+ void schedule(Task func) override {
+ if (_baton)
+ return _baton->schedule(std::move(func));
+ return _executor->schedule(std::move(func));
+ }
+
+ ExecutorFuture<void> sleepFor(Milliseconds duration, const CancellationToken& token) {
+ auto deadline = Date_t::now() + duration;
+ if (auto netBaton = _baton ? _baton->networking() : nullptr; netBaton) {
+ return netBaton->waitUntil(deadline, token).thenRunOn(shared_from_this());
+ }
+ return _executor->sleepFor(duration, token);
+ }
+
+private:
+ BatonHandle _baton;
+ std::shared_ptr<TaskExecutor> _executor;
+};
+
template <typename CommandType>
ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRunner(
BSONObj cmdBSON,
@@ -162,6 +204,7 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun
OperationContext* opCtx,
std::unique_ptr<Targeter> targeter) {
using ReplyType = AsyncRPCResponse<typename CommandType::Reply>;
+ auto proxyExec = std::make_shared<ProxyingExecutor>(options->baton, options->exec);
auto tryBody = [=, targeter = std::move(targeter)] {
// Execute the command after extracting the db name and bson from the CommandType.
// Wrapping this function allows us to separate the CommandType parsing logic from the
@@ -181,7 +224,8 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun
return shouldStopRetry;
})
.withBackoffBetweenIterations(RetryDelayAsBackoff(options->retryPolicy.get()))
- .on(options->exec, CancellationToken::uncancelable());
+ .on(proxyExec, CancellationToken::uncancelable());
+
return std::move(resFuture)
.then([](detail::AsyncRPCInternalResponse r) -> ReplyType {
auto res = CommandType::Reply::parseSharingOwnership(IDLParserContext("AsyncRPCRunner"),