diff options
Diffstat (limited to 'src/mongo/db/mirror_maestro.cpp')
-rw-r--r-- | src/mongo/db/mirror_maestro.cpp | 61 |
1 files changed, 48 insertions, 13 deletions
diff --git a/src/mongo/db/mirror_maestro.cpp b/src/mongo/db/mirror_maestro.cpp index 31d9de386d9..7f898db664d 100644 --- a/src/mongo/db/mirror_maestro.cpp +++ b/src/mongo/db/mirror_maestro.cpp @@ -41,6 +41,7 @@ #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/json.h" +#include "mongo/db/client_out_of_line_executor.h" #include "mongo/db/commands.h" #include "mongo/db/commands/server_status.h" #include "mongo/db/mirror_maestro_gen.h" @@ -90,15 +91,43 @@ public: */ void tryMirror(std::shared_ptr<CommandInvocation> invocation) noexcept; + /** + * Maintains the state required for mirroring requests. + */ + class MirroredRequestState { + public: + MirroredRequestState(MirrorMaestroImpl* maestro, + std::vector<HostAndPort> hosts, + std::shared_ptr<CommandInvocation> invocation, + MirroredReadsParameters params) + : _maestro(std::move(maestro)), + _hosts(std::move(hosts)), + _invocation(std::move(invocation)), + _params(std::move(params)) {} + + MirroredRequestState() = delete; + + void mirror() noexcept { + invariant(_maestro); + _maestro->_mirror(_hosts, _invocation, _params); + } + + private: + MirrorMaestroImpl* _maestro; + std::vector<HostAndPort> _hosts; + std::shared_ptr<CommandInvocation> _invocation; + MirroredReadsParameters _params; + }; + private: /** * Attempt to mirror invocation to a subset of hosts based on params * * This command is expected to only run on the _executor */ - void _mirror(std::vector<HostAndPort> hosts, + void _mirror(const std::vector<HostAndPort>& hosts, std::shared_ptr<CommandInvocation> invocation, - MirroredReadsParameters params) noexcept; + const MirroredReadsParameters& params) noexcept; /** * An enum detailing the liveness of the Maestro @@ -256,26 +285,32 @@ void MirrorMaestroImpl::tryMirror(std::shared_ptr<CommandInvocation> invocation) return; } + auto clientExecutor = ClientOutOfLineExecutor::get(Client::getCurrent()); + auto clientExecutorHandle = clientExecutor->getHandle(); + + // TODO SERVER-46619 delegates collection to the client's baton + clientExecutor->consumeAllTasks(); + // There is the potential to actually mirror requests, so schedule the _mirror() invocation // out-of-line. This means the command itself can return quickly and we do the arduous work of // building new bsons and evaluating randomness in a less important context. + auto requestState = std::make_unique<MirroredRequestState>( + this, std::move(hosts), std::move(invocation), std::move(params)); ExecutorFuture(_executor) // - .getAsync([this, - hosts = std::move(hosts), - invocation = std::move(invocation), - params = std::move(params)](const auto& status) mutable { - if (ErrorCodes::isShutdownError(status)) { - return; + .getAsync([clientExecutorHandle, + requestState = std::move(requestState)](const auto& status) mutable { + if (!ErrorCodes::isShutdownError(status)) { + invariant(status.isOK()); + requestState->mirror(); } - invariant(status.isOK()); - - _mirror(std::move(hosts), std::move(invocation), std::move(params)); + clientExecutorHandle.schedule([requestState = std::move(requestState)]( + const Status&) mutable { requestState.reset(); }); }); } -void MirrorMaestroImpl::_mirror(std::vector<HostAndPort> hosts, +void MirrorMaestroImpl::_mirror(const std::vector<HostAndPort>& hosts, std::shared_ptr<CommandInvocation> invocation, - MirroredReadsParameters params) noexcept try { + const MirroredReadsParameters& params) noexcept try { auto payload = [&] { BSONObjBuilder bob; |