diff options
author | George Wangensteen <george.wangensteen@mongodb.com> | 2023-01-04 02:33:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-04 03:15:16 +0000 |
commit | 99cf1c7c97eb67ad7505140e20cab9d1c23d0b0c (patch) | |
tree | 0878db4d0339391c2966338539cdfb3300af8836 /src/mongo/executor/async_rpc.h | |
parent | 4ce757f499c7854c5029d432f5a473802bca1279 (diff) | |
download | mongo-99cf1c7c97eb67ad7505140e20cab9d1c23d0b0c.tar.gz |
SERVER-71230 Add baton support to async rpc senders
Diffstat (limited to 'src/mongo/executor/async_rpc.h')
-rw-r--r-- | src/mongo/executor/async_rpc.h | 52 |
1 files changed, 48 insertions, 4 deletions
diff --git a/src/mongo/executor/async_rpc.h b/src/mongo/executor/async_rpc.h index 3c4c6cf05ec..102cf7e0bb0 100644 --- a/src/mongo/executor/async_rpc.h +++ b/src/mongo/executor/async_rpc.h @@ -86,12 +86,14 @@ struct AsyncRPCOptions { AsyncRPCOptions(CommandType cmd, std::shared_ptr<executor::TaskExecutor> exec, CancellationToken token, - std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) - : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy} {} + std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(), + BatonHandle baton = nullptr) + : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy}, baton{std::move(baton)} {} CommandType cmd; std::shared_ptr<executor::TaskExecutor> exec; CancellationToken token; std::shared_ptr<RetryPolicy> retryPolicy; + BatonHandle baton; }; /** @@ -124,7 +126,22 @@ public: Targeter* targeter, OperationContext* opCtx, std::shared_ptr<TaskExecutor> exec, - CancellationToken token) = 0; + CancellationToken token, + BatonHandle baton) = 0; + ExecutorFuture<AsyncRPCInternalResponse> _sendCommand(StringData dbName, + BSONObj cmdBSON, + Targeter* targeter, + OperationContext* opCtx, + std::shared_ptr<TaskExecutor> exec, + CancellationToken token) { + return _sendCommand(std::move(dbName), + std::move(cmdBSON), + std::move(targeter), + std::move(opCtx), + std::move(exec), + std::move(token), + nullptr); + } static AsyncRPCRunner* get(ServiceContext* serviceContext); static void set(ServiceContext* serviceContext, std::unique_ptr<AsyncRPCRunner> theRunner); }; @@ -154,6 +171,31 @@ struct RetryDelayAsBackoff { RetryPolicy* _policy; }; +class ProxyingExecutor : public OutOfLineExecutor, + public std::enable_shared_from_this<ProxyingExecutor> { +public: + ProxyingExecutor(BatonHandle baton, std::shared_ptr<TaskExecutor> executor) + : _baton{std::move(baton)}, _executor{std::move(executor)} {} + + void schedule(Task func) override { + if (_baton) + return _baton->schedule(std::move(func)); + return _executor->schedule(std::move(func)); + } + + ExecutorFuture<void> sleepFor(Milliseconds duration, const CancellationToken& token) { + auto deadline = Date_t::now() + duration; + if (auto netBaton = _baton ? _baton->networking() : nullptr; netBaton) { + return netBaton->waitUntil(deadline, token).thenRunOn(shared_from_this()); + } + return _executor->sleepFor(duration, token); + } + +private: + BatonHandle _baton; + std::shared_ptr<TaskExecutor> _executor; +}; + template <typename CommandType> ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRunner( BSONObj cmdBSON, @@ -162,6 +204,7 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun OperationContext* opCtx, std::unique_ptr<Targeter> targeter) { using ReplyType = AsyncRPCResponse<typename CommandType::Reply>; + auto proxyExec = std::make_shared<ProxyingExecutor>(options->baton, options->exec); auto tryBody = [=, targeter = std::move(targeter)] { // Execute the command after extracting the db name and bson from the CommandType. // Wrapping this function allows us to separate the CommandType parsing logic from the @@ -181,7 +224,8 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun return shouldStopRetry; }) .withBackoffBetweenIterations(RetryDelayAsBackoff(options->retryPolicy.get())) - .on(options->exec, CancellationToken::uncancelable()); + .on(proxyExec, CancellationToken::uncancelable()); + return std::move(resFuture) .then([](detail::AsyncRPCInternalResponse r) -> ReplyType { auto res = CommandType::Reply::parseSharingOwnership(IDLParserContext("AsyncRPCRunner"), |