summaryrefslogtreecommitdiff
path: root/src/mongo/db/mirror_maestro.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/mirror_maestro.cpp')
-rw-r--r--src/mongo/db/mirror_maestro.cpp61
1 files changed, 48 insertions, 13 deletions
diff --git a/src/mongo/db/mirror_maestro.cpp b/src/mongo/db/mirror_maestro.cpp
index 31d9de386d9..7f898db664d 100644
--- a/src/mongo/db/mirror_maestro.cpp
+++ b/src/mongo/db/mirror_maestro.cpp
@@ -41,6 +41,7 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/json.h"
+#include "mongo/db/client_out_of_line_executor.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/server_status.h"
#include "mongo/db/mirror_maestro_gen.h"
@@ -90,15 +91,43 @@ public:
*/
void tryMirror(std::shared_ptr<CommandInvocation> invocation) noexcept;
+ /**
+ * Maintains the state required for mirroring requests.
+ */
+ class MirroredRequestState {
+ public:
+ MirroredRequestState(MirrorMaestroImpl* maestro,
+ std::vector<HostAndPort> hosts,
+ std::shared_ptr<CommandInvocation> invocation,
+ MirroredReadsParameters params)
+ : _maestro(std::move(maestro)),
+ _hosts(std::move(hosts)),
+ _invocation(std::move(invocation)),
+ _params(std::move(params)) {}
+
+ MirroredRequestState() = delete;
+
+ void mirror() noexcept {
+ invariant(_maestro);
+ _maestro->_mirror(_hosts, _invocation, _params);
+ }
+
+ private:
+ MirrorMaestroImpl* _maestro;
+ std::vector<HostAndPort> _hosts;
+ std::shared_ptr<CommandInvocation> _invocation;
+ MirroredReadsParameters _params;
+ };
+
private:
/**
* Attempt to mirror invocation to a subset of hosts based on params
*
* This command is expected to only run on the _executor
*/
- void _mirror(std::vector<HostAndPort> hosts,
+ void _mirror(const std::vector<HostAndPort>& hosts,
std::shared_ptr<CommandInvocation> invocation,
- MirroredReadsParameters params) noexcept;
+ const MirroredReadsParameters& params) noexcept;
/**
* An enum detailing the liveness of the Maestro
@@ -256,26 +285,32 @@ void MirrorMaestroImpl::tryMirror(std::shared_ptr<CommandInvocation> invocation)
return;
}
+ auto clientExecutor = ClientOutOfLineExecutor::get(Client::getCurrent());
+ auto clientExecutorHandle = clientExecutor->getHandle();
+
+ // TODO SERVER-46619 delegates collection to the client's baton
+ clientExecutor->consumeAllTasks();
+
// There is the potential to actually mirror requests, so schedule the _mirror() invocation
// out-of-line. This means the command itself can return quickly and we do the arduous work of
// building new bsons and evaluating randomness in a less important context.
+ auto requestState = std::make_unique<MirroredRequestState>(
+ this, std::move(hosts), std::move(invocation), std::move(params));
ExecutorFuture(_executor) //
- .getAsync([this,
- hosts = std::move(hosts),
- invocation = std::move(invocation),
- params = std::move(params)](const auto& status) mutable {
- if (ErrorCodes::isShutdownError(status)) {
- return;
+ .getAsync([clientExecutorHandle,
+ requestState = std::move(requestState)](const auto& status) mutable {
+ if (!ErrorCodes::isShutdownError(status)) {
+ invariant(status.isOK());
+ requestState->mirror();
}
- invariant(status.isOK());
-
- _mirror(std::move(hosts), std::move(invocation), std::move(params));
+ clientExecutorHandle.schedule([requestState = std::move(requestState)](
+ const Status&) mutable { requestState.reset(); });
});
}
-void MirrorMaestroImpl::_mirror(std::vector<HostAndPort> hosts,
+void MirrorMaestroImpl::_mirror(const std::vector<HostAndPort>& hosts,
std::shared_ptr<CommandInvocation> invocation,
- MirroredReadsParameters params) noexcept try {
+ const MirroredReadsParameters& params) noexcept try {
auto payload = [&] {
BSONObjBuilder bob;