diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-01-28 20:01:11 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-14 21:05:34 +0000 |
commit | 6d163783f03e07a172154eaa0fa270c3c05c4d08 (patch) | |
tree | 93531ccec5db6babe5e8387014ec733e195fea08 /src/mongo | |
parent | 9d837976a3c9b408df43f3e6c38ceb3837b0650e (diff) | |
download | mongo-6d163783f03e07a172154eaa0fa270c3c05c4d08.tar.gz |
SERVER-45814 Define subsystem to mirror read command requests
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 22 | ||||
-rw-r--r-- | src/mongo/db/commands.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/commands.h | 47 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/mirror_maestro.cpp | 251 | ||||
-rw-r--r-- | src/mongo/db/mirror_maestro.h | 78 | ||||
-rw-r--r-- | src/mongo/db/mirror_maestro.idl | 57 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 7 |
11 files changed, 516 insertions, 8 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 57cdf022a47..c57724f3566 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -402,6 +402,7 @@ mongod = env.Program( 'db/logical_session_cache_factory_mongod', 'db/logical_time_metadata_hook', 'db/matcher/expressions_mongod_only', + 'db/mirror_maestro', 'db/mongod_options', 'db/ops/write_ops_parsers', 'db/periodic_runner_job_abort_expired_transactions', diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f1f41daa4c2..c1099bfa5bf 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -465,6 +465,28 @@ env.Clone().InjectModule("enterprise").Library( ) env.Library( + target='mirror_maestro', + source=[ + 'mirror_maestro.cpp', + env.Idlc('mirror_maestro.idl')[0], + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/executor/task_executor_interface', + 'service_context', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/bson/util/bson_extract', + '$BUILD_DIR/mongo/executor/network_interface_factory', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + '$BUILD_DIR/mongo/idl/idl_parser', + 'repl/replica_set_messages', + 'repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', + ], +) + +env.Library( target="commands", source=[ 'commands.cpp', diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index 6a94edbf374..e29e7128410 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -135,8 +135,15 @@ const StringMap<int> txnAdminCommands = {{"abortTransaction", 1}, {"coordinateCommitTransaction", 1}, {"prepareTransaction", 1}}; +auto getCommandInvocationHooksHandle = + ServiceContext::declareDecoration<std::shared_ptr<CommandInvocationHooks>>(); + } // namespace +void CommandInvocationHooks::set(ServiceContext* serviceContext, + std::shared_ptr<CommandInvocationHooks> hooks) { + getCommandInvocationHooksHandle(serviceContext) = std::move(hooks); +} ////////////////////////////////////////////////////////////// // CommandHelpers @@ -165,6 +172,23 @@ BSONObj CommandHelpers::runCommandDirectly(OperationContext* opCtx, const OpMsgR return replyBuilder.releaseBody(); } +void CommandHelpers::runCommandInvocation(OperationContext* opCtx, + const OpMsgRequest& request, + CommandInvocation* invocation, + rpc::ReplyBuilderInterface* response) { + auto hooks = getCommandInvocationHooksHandle(opCtx->getServiceContext()); + + if (hooks) { + hooks->onBeforeRun(opCtx, request, invocation); + } + + invocation->run(opCtx, response); + + if (hooks) { + hooks->onAfterRun(opCtx, request, invocation); + } +} + void CommandHelpers::auditLogAuthEvent(OperationContext* opCtx, const CommandInvocation* invocation, const OpMsgRequest& request, diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 8590d3b9f16..a9467d34c0b 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -67,6 +67,36 @@ namespace mutablebson { class Document; } // namespace mutablebson +/** + * A simple set of type-erased hooks for pre and post command actions. + * + * These hooks will only run on external requests that form CommandInvocations (a.k.a. OP_MSG + * requests). They are not applied for runCommandDirectly() or raw CommandInvocation::run() calls. + */ +class CommandInvocationHooks { +public: + /** + * Set the current hooks + */ + static void set(ServiceContext* serviceContext, std::shared_ptr<CommandInvocationHooks> hooks); + + virtual ~CommandInvocationHooks() = default; + + /** + * A behavior to perform before CommandInvocation::run() + */ + virtual void onBeforeRun(OperationContext* opCtx, + const OpMsgRequest& request, + CommandInvocation* invocation) = 0; + + /** + * A behavior to perform after CommandInvocation::run() + */ + virtual void onAfterRun(OperationContext* opCtx, + const OpMsgRequest& request, + CommandInvocation* invocation) = 0; +}; + // Various helpers unrelated to any single command or to the command registry. // Would be a namespace, but want to keep it closed rather than open. // Some of these may move to the BasicCommand shim if they are only for legacy implementations. @@ -194,6 +224,16 @@ struct CommandHelpers { static BSONObj runCommandDirectly(OperationContext* opCtx, const OpMsgRequest& request); /** + * Runs a previously parsed CommandInvocation and propagates the result to the + * ReplyBuilderInterface. This function is agnostic to the derived type of the CommandInvocation + * but may mirror, forward, or do other supplementary actions with the request. + */ + static void runCommandInvocation(OperationContext* opCtx, + const OpMsgRequest& request, + CommandInvocation* invocation, + rpc::ReplyBuilderInterface* response); + + /** * If '!invocation', we're logging about a Command pre-parse. It has to punt on the logged * namespace, giving only the request's $db. Since the Command hasn't parsed the request body, * we can't know the collection part of that namespace, so we leave it blank in the audit log. @@ -507,6 +547,13 @@ public: } /** + * Returns this invocation's support for readMirroring. + */ + virtual bool supportsReadMirroring() const { + return false; + } + + /** * Returns true if command allows afterClusterTime in its readConcern. The command may not allow * it if it is specifically intended not to take any LockManager locks. Waiting for * afterClusterTime takes the MODE_IS lock. diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 4519c4e3c2d..abeb4641446 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -193,6 +193,10 @@ public: return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } + bool supportsReadMirroring() const override { + return true; + } + bool canIgnorePrepareConflicts() const override { return true; } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index a893d1c0059..b7f1c0945c8 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -96,6 +96,7 @@ #include "mongo/db/logical_session_cache_factory_mongod.h" #include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/logical_time_validator.h" +#include "mongo/db/mirror_maestro.h" #include "mongo/db/mongod_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer_impl.h" @@ -275,6 +276,22 @@ void initWireSpec() { spec.isInternalClient = true; } +void initializeCommandHooks(ServiceContext* serviceContext) { + class MongodCommandInvocationHooks final : public CommandInvocationHooks { + void onBeforeRun(OperationContext* opCtx, + const OpMsgRequest& request, + CommandInvocation* invocation) {} + void onAfterRun(OperationContext* opCtx, + const OpMsgRequest& request, + CommandInvocation* invocation) { + MirrorMaestro::tryMirror(opCtx, request, invocation); + } + }; + + MirrorMaestro::init(serviceContext); + CommandInvocationHooks::set(serviceContext, std::make_shared<MongodCommandInvocationHooks>()); +} + MONGO_FAIL_POINT_DEFINE(shutdownAtStartup); ExitCode _initAndListen(int listenPort) { @@ -698,6 +715,8 @@ ExitCode _initAndListen(int listenPort) { LogicalSessionCache::set(serviceContext, makeLogicalSessionCacheD(kind)); + initializeCommandHooks(serviceContext); + // MessageServer::run will return when exit code closes its socket and we don't need the // operation context anymore startupOpCtx.reset(); @@ -1033,6 +1052,8 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { } } + MirrorMaestro::shutdown(serviceContext); + WaitForMajorityService::get(serviceContext).shutDown(); // Terminate the balancer thread so it doesn't leak memory. diff --git a/src/mongo/db/mirror_maestro.cpp b/src/mongo/db/mirror_maestro.cpp new file mode 100644 index 00000000000..a31bd43dd30 --- /dev/null +++ b/src/mongo/db/mirror_maestro.cpp @@ -0,0 +1,251 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/mirror_maestro.h" + +#include <utility> + +#include <fmt/format.h> + +#include "mongo/bson/bsonelement.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/commands.h" +#include "mongo/db/mirror_maestro_gen.h" +#include "mongo/db/repl/is_master_response.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/logv2/log.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/synchronized_value.h" + +namespace mongo { + +class MirrorMaestroImpl { +public: + /** + * Make the TaskExecutor and initialize other components + */ + void init(ServiceContext* serviceContext) noexcept; + + /** + * Shutdown the TaskExecutor and cancel any outstanding work + */ + void shutdown() noexcept; + + /** + * Mirror only if this maestro has been initialized + */ + void tryMirror(OperationContext* opCtx, + const OpMsgRequest& request, + const CommandInvocation* invocation) noexcept; + +private: + /** + * An enum detailing the liveness of the Maestro + * + * The state transition map for liveness looks like so: + * kUninitialized -> kRunning, kShutdown + * kRunning -> kShutdown + * kShutdown -> null + */ + enum class Liveness { + kUninitialized, + kRunning, + kShutdown, + }; + + // InitializationGuard guards and serializes the initialization and shutdown of members + struct InitializationGuard { + Mutex mutex = MONGO_MAKE_LATCH("MirrorMaestroImpl::InitializationGuard::mutex"); + Liveness liveness; + }; + InitializationGuard _initGuard; + + // _isInitialized guards the use of heap allocated members like _executor + // Even if _isInitialized is true, any member function of the variables below must still be + // inately thread safe. If _isInitialized is false, there may not even be correct pointers to + // call member functions upon. + AtomicWord<bool> _isInitialized; + std::shared_ptr<executor::TaskExecutor> _executor; +}; + + +namespace { +constexpr auto kMirrorMaestroName = "MirrorMaestro"_sd; +constexpr auto kMirrorMaestroThreadPoolMaxThreads = 2ull; + +const auto getMirrorMaestroImpl = ServiceContext::declareDecoration<MirrorMaestroImpl>(); +} // namespace + +void MirroredReadsServerParameter::append(OperationContext*, + BSONObjBuilder& bob, + const std::string& name) { + auto subBob = BSONObjBuilder(bob.subobjStart(name)); + _data->serialize(&subBob); +} + +Status MirroredReadsServerParameter::set(const BSONElement& value) try { + auto obj = value.Obj(); + + IDLParserErrorContext ctx(name()); + _data = MirroredReadsParameters::parse(ctx, obj); + + return Status::OK(); +} catch (const AssertionException& e) { + return e.toStatus(); +} + +Status MirroredReadsServerParameter::setFromString(const std::string&) { + using namespace fmt::literals; + auto msg = "{:s} cannot be set from a string."_format(name()); + return {ErrorCodes::BadValue, msg}; +} + +void MirrorMaestro::init(ServiceContext* serviceContext) noexcept { + auto replCoord = repl::ReplicationCoordinator::get(serviceContext); + invariant(replCoord); + if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { + // We only need a maestro if we're in a replica set + return; + } + + auto& impl = getMirrorMaestroImpl(serviceContext); + impl.init(serviceContext); +} + +void MirrorMaestro::shutdown(ServiceContext* serviceContext) noexcept { + auto& impl = getMirrorMaestroImpl(serviceContext); + impl.shutdown(); +} + +void MirrorMaestro::tryMirror(OperationContext* opCtx, + const OpMsgRequest& request, + const CommandInvocation* invocation) noexcept { + auto& impl = getMirrorMaestroImpl(opCtx->getServiceContext()); + impl.tryMirror(opCtx, request, invocation); +} + +void MirrorMaestroImpl::tryMirror(OperationContext* opCtx, + const OpMsgRequest& request, + const CommandInvocation* invocation) noexcept { + if (!_isInitialized.load()) { + // If we're not even available, nothing to do + return; + } + + if (!invocation->supportsReadMirroring()) { + // That's all, folks + return; + } + + // TODO SERVER-45816 will add the sampling function and attach the command + repl::IsMasterResponse* imr = nullptr; + if (!imr) { + // If we don't have an IsMasterResponse, we can't know where to send our mirrored + // request + return; + } + + MONGO_UNREACHABLE; +} + +void MirrorMaestroImpl::init(ServiceContext* serviceContext) noexcept { + LOGV2_DEBUG(31452, 2, "Initializing MirrorMaestro"); + + // Until the end of this scope, no other thread can mutate _initGuard.liveness, so no other + // thread can be in the critical section of init() or shutdown(). + stdx::lock_guard lk(_initGuard.mutex); + switch (_initGuard.liveness) { + case Liveness::kUninitialized: { + // We can init + } break; + case Liveness::kRunning: { + // If someone else already initialized, do nothing + return; + } break; + case Liveness::kShutdown: { + LOGV2_DEBUG( + 31453, 2, "MirrorMaestro cannot initialize as it has already been shutdown"); + return; + } break; + }; + + auto makeNet = [&] { return executor::makeNetworkInterface(kMirrorMaestroName.toString()); }; + + auto makePool = [&] { + ThreadPool::Options options; + options.poolName = kMirrorMaestroName.toString(); + options.maxThreads = kMirrorMaestroThreadPoolMaxThreads; + return std::make_unique<ThreadPool>(std::move(options)); + }; + _executor = std::make_shared<executor::ThreadPoolTaskExecutor>(makePool(), makeNet()); + + _executor->startup(); + + // Set _initGuard.liveness to kRunning + _initGuard.liveness = Liveness::kRunning; + + // Mark the maestro as initialized. It is now safe to call tryMirror(), use the _executor, or + // otherwise rely on members to be alive and well. + _isInitialized.store(true); +} + +void MirrorMaestroImpl::shutdown() noexcept { + LOGV2_DEBUG(31454, 2, "Shutting down MirrorMaestro"); + + // Until the end of this scope, no other thread can mutate _initGuard.liveness, so no other + // thread can be in the critical section of init() or shutdown(). + stdx::lock_guard lk(_initGuard.mutex); + switch (_initGuard.liveness) { + case Liveness::kUninitialized: + case Liveness::kShutdown: { + // If someone else already shutdown or we never init'd, do nothing + return; + } break; + case Liveness::kRunning: { + // Time to shut it all down + } break; + }; + + if (_executor) { + _executor->shutdown(); + } + + // Set _initGuard.liveness to kShutdown + _initGuard.liveness = Liveness::kShutdown; +} + +} // namespace mongo diff --git a/src/mongo/db/mirror_maestro.h b/src/mongo/db/mirror_maestro.h new file mode 100644 index 00000000000..da99be92787 --- /dev/null +++ b/src/mongo/db/mirror_maestro.h @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <vector> + +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/executor/task_executor.h" +#include "mongo/platform/atomic_word.h" + +namespace mongo { + +class CommandInvocation; +struct OpMsgRequest; + +/** + * MirrorMaestro coordinates commands received by a replica set member and potentially + * sends a copy of the request to other members of replica set. + * + * All public functions are thread-safe. + */ +class MirrorMaestro { +public: + /** + * Initialize the MirrorMaestro for serviceContext + * + * This function blocks until the MirrorMaestro is available. + */ + static void init(ServiceContext* serviceContext) noexcept; + + /** + * Shutdown the MirrorMaestro for serviceContext + * + * This function blocks until the MirrorMaestro is no longer available. + */ + static void shutdown(ServiceContext* serviceContext) noexcept; + + /** + * Check if a given invocation+request should be mirrored to secondaries, and schedule that work + * if so. + * + * This function will noop if the MirrorMaestro is currently being initialized or shutdown. + */ + static void tryMirror(OperationContext* opCtx, + const OpMsgRequest& request, + const CommandInvocation* invocation) noexcept; +}; + +} // namespace mongo diff --git a/src/mongo/db/mirror_maestro.idl b/src/mongo/db/mirror_maestro.idl new file mode 100644 index 00000000000..aa058107391 --- /dev/null +++ b/src/mongo/db/mirror_maestro.idl @@ -0,0 +1,57 @@ +# Copyright (C) 2020-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +global: + cpp_namespace: mongo + cpp_includes: + - "mongo/db/mirror_maestro.h" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + MirroredReadsParameters: + description: "A struct representing how to mirror read command requests" + strict: true + fields: + samplingRate: + description: >- + A floating-point value within [0.0,1.0] specifying what fraction of reads to sample + type: double + default: 0.0 + validator: + gte: 0.0 + lte: 1.0 + +server_parameters: + mirrorReads: + description: "How to mirror read command requests" + set_at: [ startup, runtime ] + cpp_class: + name: "MirroredReadsServerParameter" + data: "synchronized_value<MirroredReadsParameters>" + override_set: true diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index bd4165f2fe2..ccab76f4357 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -515,6 +515,7 @@ void _abortUnpreparedOrStashPreparedTransaction( } // namespace void invokeWithSessionCheckedOut(OperationContext* opCtx, + const OpMsgRequest& request, CommandInvocation* invocation, const OperationSessionInfoFromClient& sessionOptions, rpc::ReplyBuilderInterface* replyBuilder) { @@ -590,7 +591,7 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, }); try { - invocation->run(opCtx, replyBuilder); + CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>&) { // Exceptions are used to resolve views in a sharded cluster, so they should be handled // specially to avoid unnecessary aborts. @@ -743,9 +744,10 @@ bool runCommandImpl(OperationContext* opCtx, errorBuilder.append("errmsg", "failWithErrorCodeInRunCommand enabled."); replyBuilder->setCommandReply(errorBuilder.obj()); } else if (shouldCheckOutSession) { - invokeWithSessionCheckedOut(opCtx, invocation, sessionOptions, replyBuilder); + invokeWithSessionCheckedOut( + opCtx, request, invocation, sessionOptions, replyBuilder); } else { - invocation->run(opCtx, replyBuilder); + CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); } } catch (const DBException& ex) { // Do no-op write before returning NoSuchTransaction if command has writeConcern. @@ -769,9 +771,9 @@ bool runCommandImpl(OperationContext* opCtx, } else { behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); if (shouldCheckOutSession) { - invokeWithSessionCheckedOut(opCtx, invocation, sessionOptions, replyBuilder); + invokeWithSessionCheckedOut(opCtx, request, invocation, sessionOptions, replyBuilder); } else { - invocation->run(opCtx, replyBuilder); + CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); } } diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index e130e65b707..81104682c3c 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -167,6 +167,7 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res * Invokes the given command and aborts the transaction on any non-retryable errors. */ void invokeInTransactionRouter(OperationContext* opCtx, + const OpMsgRequest& request, CommandInvocation* invocation, rpc::ReplyBuilderInterface* result) { auto txnRouter = TransactionRouter::get(opCtx); @@ -176,7 +177,7 @@ void invokeInTransactionRouter(OperationContext* opCtx, txnRouter.setDefaultAtClusterTime(opCtx); try { - invocation->run(opCtx, result); + CommandHelpers::runCommandInvocation(opCtx, request, invocation, result); } catch (const DBException& e) { if (ErrorCodes::isSnapshotError(e.code()) || ErrorCodes::isNeedRetargettingError(e.code()) || @@ -273,9 +274,9 @@ void execCommandClient(OperationContext* opCtx, auto txnRouter = TransactionRouter::get(opCtx); if (txnRouter) { - invokeInTransactionRouter(opCtx, invocation, result); + invokeInTransactionRouter(opCtx, request, invocation, result); } else { - invocation->run(opCtx, result); + CommandHelpers::runCommandInvocation(opCtx, request, invocation, result); } if (invocation->supportsWriteConcern()) { |