From a0fce10bb34cb04455f02948352f5eb37b38ad75 Mon Sep 17 00:00:00 2001 From: Amirsaman Memaripour Date: Thu, 17 Sep 2020 17:36:35 +0000 Subject: SERVER-49107 Add support for async execution to MongoD command path --- src/mongo/base/error_codes.yml | 4 + src/mongo/db/commands.cpp | 24 + src/mongo/db/commands.h | 50 + src/mongo/db/repl/tenant_migration_donor_util.cpp | 54 + src/mongo/db/repl/tenant_migration_donor_util.h | 43 +- src/mongo/db/request_execution_context.h | 131 ++ src/mongo/db/service_entry_point_common.cpp | 1888 ++++++++++++-------- src/mongo/db/service_entry_point_common.h | 4 +- src/mongo/db/service_entry_point_mongod.cpp | 8 +- .../embedded/service_entry_point_embedded.cpp | 4 +- 10 files changed, 1387 insertions(+), 823 deletions(-) create mode 100644 src/mongo/db/request_execution_context.h diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 3324135c668..ee1d27ea6b8 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -399,6 +399,10 @@ error_codes: - {code: 329, name: TenantMigrationInProgress} + - {code: 330, name: SkipCommandExecution} + + - {code: 331, name: FailedToRunWithReplyBuilder} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index a2c7a804538..83fba136be7 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -185,6 +185,20 @@ void CommandHelpers::runCommandInvocation(OperationContext* opCtx, } } +Future CommandHelpers::runCommandInvocationAsync( + std::shared_ptr rec, + std::shared_ptr invocation) try { + auto hooks = getCommandInvocationHooksHandle(rec->getOpCtx()->getServiceContext()); + if (hooks) + hooks->onBeforeAsyncRun(rec, invocation.get()); + return invocation->runAsync(rec).then([rec, hooks, invocation] { + if (hooks) + hooks->onAfterAsyncRun(rec, invocation.get()); + }); +} catch (const DBException& e) { + return e.toStatus(); +} + void CommandHelpers::auditLogAuthEvent(OperationContext* opCtx, const CommandInvocation* invocation, const OpMsgRequest& request, @@ -801,6 +815,16 @@ private: } } + Future runAsync(std::shared_ptr rec) override { + return _command->runAsync(rec, _dbName).onError([rec](Status status) { + if (status.code() != ErrorCodes::FailedToRunWithReplyBuilder) + return status; + BSONObjBuilder bob = rec->getReplyBuilder()->getBodyBuilder(); + CommandHelpers::appendSimpleCommandStatus(bob, false); + return Status::OK(); + }); + } + void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) override { diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index d1cf2b369c8..e1805991cab 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -30,6 +30,7 @@ #pragma once #include +#include #include #include #include @@ -47,10 +48,12 @@ #include "mongo/db/query/explain.h" #include "mongo/db/read_concern_support_result.h" #include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/request_execution_context.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/op_msg.h" #include "mongo/rpc/reply_builder_interface.h" #include "mongo/util/fail_point.h" +#include "mongo/util/future.h" #include "mongo/util/string_map.h" namespace mongo { @@ -91,12 +94,28 @@ public: const OpMsgRequest& request, CommandInvocation* invocation) = 0; + /** + * A behavior to perform before CommandInvocation::asyncRun(). Defaults to `onBeforeRun(...)`. + */ + virtual void onBeforeAsyncRun(std::shared_ptr rec, + CommandInvocation* invocation) { + onBeforeRun(rec->getOpCtx(), rec->getRequest(), invocation); + } + /** * A behavior to perform after CommandInvocation::run() */ virtual void onAfterRun(OperationContext* opCtx, const OpMsgRequest& request, CommandInvocation* invocation) = 0; + + /** + * A behavior to perform after CommandInvocation::asyncRun(). Defaults to `onAfterRun(...)`. + */ + virtual void onAfterAsyncRun(std::shared_ptr rec, + CommandInvocation* invocation) { + onAfterRun(rec->getOpCtx(), rec->getRequest(), invocation); + } }; // Various helpers unrelated to any single command or to the command registry. @@ -235,6 +254,15 @@ struct CommandHelpers { CommandInvocation* invocation, rpc::ReplyBuilderInterface* response); + /** + * Runs a previously parsed command and propagates the result to the ReplyBuilderInterface. For + * commands that do not offer an implementation tailored for asynchronous execution, the future + * schedules the execution of the default implementation, historically designed for synchronous + * execution. + */ + static Future runCommandInvocationAsync(std::shared_ptr rec, + std::shared_ptr invocation); + /** * If '!invocation', we're logging about a Command pre-parse. It has to punt on the logged * namespace, giving only the request's $db. Since the Command hasn't parsed the request body, @@ -568,6 +596,16 @@ public: */ virtual void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) = 0; + /** + * Returns a future that can schedule asynchronous execution of the command. By default, the + * future falls back to the execution of `run(...)`, thus the default semantics of + * `runAsync(...)` is identical to that of `run(...). + */ + virtual Future runAsync(std::shared_ptr rec) { + run(rec->getOpCtx(), rec->getReplyBuilder()); + return Status::OK(); + } + virtual void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) { @@ -704,6 +742,18 @@ public: const BSONObj& cmdObj, rpc::ReplyBuilderInterface* replyBuilder) = 0; + /** + * Provides a future that may run the command asynchronously. By default, it falls back to + * runWithReplyBuilder. + */ + virtual Future runAsync(std::shared_ptr rec, std::string db) { + if (!runWithReplyBuilder( + rec->getOpCtx(), db, rec->getRequest().body, rec->getReplyBuilder())) + return Status(ErrorCodes::FailedToRunWithReplyBuilder, + fmt::format("Failed to run command: {}", rec->getCommand()->getName())); + return Status::OK(); + } + /** * Commands which can be explained override this method. Any operation which has a query * part and executes as a tree of execution stages can be explained. A command should diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp index eb0e257d045..fc98af33f70 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp @@ -43,6 +43,7 @@ #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/util/assert_util.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" @@ -222,6 +223,59 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) { }); } +class MigrationConflictHandler : public std::enable_shared_from_this { +public: + MigrationConflictHandler(std::shared_ptr rec, + unique_function()> callable) + : _rec(std::move(rec)), _callable(std::move(callable)) {} + + Future run() try { + checkIfCanReadOrBlock(_rec->getOpCtx(), _rec->getRequest().getDatabase()); + // callable will modify replyBuilder. + return _callable() + .then([this, anchor = shared_from_this()] { _checkReplyForTenantMigrationConflict(); }) + .onError( + [this, anchor = shared_from_this()](Status status) { + _handleTenantMigrationConflict(std::move(status)); + }); + } catch (const DBException& e) { + return e.toStatus(); + } + +private: + void _checkReplyForTenantMigrationConflict() { + auto replyBodyBuilder = _rec->getReplyBuilder()->getBodyBuilder(); + + // getStatusFromWriteCommandReply expects an 'ok' field. + CommandHelpers::extractOrAppendOk(replyBodyBuilder); + + // Commands such as insert, update, delete, and applyOps return the result as a status + // rather than throwing. + const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj()); + + // Only throw `TenantMigrationConflict` exceptions. + if (status == ErrorCodes::TenantMigrationConflict) + internalAssert(status); + } + + void _handleTenantMigrationConflict(Status status) { + auto migrationConflictInfo = status.extraInfo(); + invariant(migrationConflictInfo); + + if (auto mtab = migrationConflictInfo->getTenantMigrationAccessBlocker()) { + uassertStatusOK(mtab->waitUntilCommittedOrAborted(_rec->getOpCtx())); + } + } + + const std::shared_ptr _rec; + const unique_function()> _callable; +}; + +Future migrationConflictHandler(std::shared_ptr rec, + unique_function()> callable) { + return std::make_shared(std::move(rec), std::move(callable))->run(); +} + } // namespace tenant_migration_donor } // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h index 71b522b9657..ead99e97bd2 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.h +++ b/src/mongo/db/repl/tenant_migration_donor_util.h @@ -35,9 +35,12 @@ #include "mongo/db/repl/tenant_migration_access_blocker_registry.h" #include "mongo/db/repl/tenant_migration_conflict_info.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/request_execution_context.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/reply_builder_interface.h" +#include "mongo/util/functional.h" +#include "mongo/util/future.h" namespace mongo { @@ -81,41 +84,13 @@ void onWriteToDatabase(OperationContext* opCtx, StringData dbName); void recoverTenantMigrationAccessBlockers(OperationContext* opCtx); /** - * Runs the argument function 'callable'. If it throws a TenantMigrationConflict error (as indicated - * in 'replyBuilder'), clears 'replyBuilder' and blocks until the migration commits or aborts, then - * throws TenantMigrationCommitted or TenantMigrationAborted. + * Returns a future that asynchronously schedules and runs the argument function 'callable'. If it + * throws a TenantMigrationConflict error (as indicated in 'replyBuilder'), clears 'replyBuilder' + * and blocks until the migration commits or aborts, then returns TenantMigrationCommitted or + * TenantMigrationAborted. */ -template -void migrationConflictHandler(OperationContext* opCtx, - StringData dbName, - Callable&& callable, - rpc::ReplyBuilderInterface* replyBuilder) { - checkIfCanReadOrBlock(opCtx, dbName); - - try { - // callable will modify replyBuilder. - callable(); - auto replyBodyBuilder = replyBuilder->getBodyBuilder(); - - // getStatusFromWriteCommandReply expects an 'ok' field. - CommandHelpers::extractOrAppendOk(replyBodyBuilder); - - // applyOps returns the result as a status rather than throwing. - const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj()); - - if (status == ErrorCodes::TenantMigrationConflict) { - uassertStatusOK(status); - } - return; - } catch (const TenantMigrationConflictException& ex) { - auto migrationConflictInfo = ex.extraInfo(); - invariant(migrationConflictInfo); - - if (auto mtab = migrationConflictInfo->getTenantMigrationAccessBlocker()) { - uassertStatusOK(mtab->waitUntilCommittedOrAborted(opCtx)); - } - } -} +Future migrationConflictHandler(std::shared_ptr rec, + unique_function()> callable); } // namespace tenant_migration_donor diff --git a/src/mongo/db/request_execution_context.h b/src/mongo/db/request_execution_context.h new file mode 100644 index 00000000000..b34c7e4f3b7 --- /dev/null +++ b/src/mongo/db/request_execution_context.h @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include + +#include "mongo/db/dbmessage.h" +#include "mongo/db/operation_context.h" +#include "mongo/rpc/message.h" +#include "mongo/rpc/op_msg.h" +#include "mongo/rpc/reply_builder_interface.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +class Command; + +/** + * Captures the execution context for a command to allow safe, shared and asynchronous accesses. + * This class owns all objects that participate in command execution (e.g., `request`). The only + * exceptions are `opCtx` and `command`. The `opCtx` remains valid so long as its corresponding + * client is attached to the executor thread. In case of `command`, it is a global, static + * construct and is safe to be accessed through raw pointers. + * Any access from a client thread that does not own the `opCtx`, or after the `opCtx` is + * released is strictly forbidden. + */ +class RequestExecutionContext { +public: + RequestExecutionContext() = delete; + RequestExecutionContext(const RequestExecutionContext&) = delete; + RequestExecutionContext(RequestExecutionContext&&) = delete; + + explicit RequestExecutionContext(OperationContext* opCtx) : _opCtx(opCtx) {} + + auto getOpCtx() const { + invariant(_isOnClientThread()); + return _opCtx; + } + + void setMessage(Message message) { + invariant(_isOnClientThread() && !_message); + _message = std::move(message); + _dbmsg = std::make_unique(_message.get()); + } + const Message& getMessage() const { + invariant(_isOnClientThread() && _message); + return _message.get(); + } + DbMessage& getDbMessage() const { + invariant(_isOnClientThread() && _dbmsg); + return *_dbmsg.get(); + } + + void setRequest(OpMsgRequest request) { + invariant(_isOnClientThread() && !_request); + _request = std::move(request); + } + const OpMsgRequest& getRequest() const { + invariant(_isOnClientThread() && _request); + return _request.get(); + } + + void setCommand(Command* command) { + invariant(_isOnClientThread() && !_command); + _command = command; + } + Command* getCommand() const { + invariant(_isOnClientThread()); + return _command; + } + + void setReplyBuilder(std::unique_ptr replyBuilder) { + invariant(_isOnClientThread() && !_replyBuilder); + _replyBuilder = std::move(replyBuilder); + } + auto getReplyBuilder() const { + invariant(_isOnClientThread() && _replyBuilder); + return _replyBuilder.get(); + } + + void setResponse(DbResponse response) { + invariant(_isOnClientThread()); + _response = std::move(response); + } + DbResponse& getResponse() { + invariant(_isOnClientThread()); + return _response; + } + +private: + bool _isOnClientThread() const { + return _opCtx != nullptr && Client::getCurrent() == _opCtx->getClient(); + } + + OperationContext* const _opCtx; + boost::optional _message; + std::unique_ptr _dbmsg; + boost::optional _request; + Command* _command = nullptr; + std::unique_ptr _replyBuilder; + DbResponse _response; +}; + +} // namespace mongo diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 5a4e9fddef6..b70b330acb9 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -72,6 +72,7 @@ #include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/tenant_migration_donor_util.h" +#include "mongo/db/request_execution_context.h" #include "mongo/db/run_op_kill_cursors.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" @@ -129,58 +130,77 @@ namespace { using namespace fmt::literals; -/** Allows for the very complex handleRequest function to be decomposed into parts. */ +/* + * Allows for the very complex handleRequest function to be decomposed into parts. + * It also provides the infrastructure to futurize the process of executing commands. + */ struct HandleRequest { - struct OpRunner { - explicit OpRunner(HandleRequest* hr) : hr{hr} {} - virtual ~OpRunner() = default; - virtual DbResponse run() = 0; - HandleRequest* hr; - }; + // Maintains the context (e.g., opCtx and replyBuilder) required for command execution. + class ExecutionContext final : public RequestExecutionContext { + public: + ExecutionContext(OperationContext* opCtx, + Message msg, + std::unique_ptr hooks) + : RequestExecutionContext(opCtx), behaviors(std::move(hooks)) { + // It also initializes dbMessage, which is accessible via getDbMessage() + setMessage(std::move(msg)); + } + ~ExecutionContext() = default; - HandleRequest(OperationContext* opCtx, - const Message& m, - const ServiceEntryPointCommon::Hooks& behaviors) - : opCtx{opCtx}, m{m}, behaviors{behaviors}, dbmsg{m} {} + Client& client() const { + return *getOpCtx()->getClient(); + } - DbResponse run(); + auto session() const { + return client().session(); + } - NetworkOp op() const { - return m.operation(); - } + NetworkOp op() const { + return getMessage().operation(); + } - CurOp& currentOp() { - return *CurOp::get(opCtx); - } + CurOp& currentOp() { + return *CurOp::get(getOpCtx()); + } - NamespaceString nsString() const { - if (!dbmsg.messageShouldHaveNs()) - return {}; - return NamespaceString(dbmsg.getns()); - } + NamespaceString nsString() const { + auto& dbmsg = getDbMessage(); + if (!dbmsg.messageShouldHaveNs()) + return {}; + return NamespaceString(dbmsg.getns()); + } - void assertValidNsString() { - if (!nsString().isValid()) { - uassert(16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false); + void assertValidNsString() { + if (!nsString().isValid()) { + uassert( + 16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false); + } } - } - std::unique_ptr makeOpRunner(); - void startOperation(); - void completeOperation(const DbResponse& dbresponse); + std::unique_ptr behaviors; + boost::optional slowMsOverride; + bool forceLog = false; + }; - Client& client() const { - return *opCtx->getClient(); - } + struct OpRunner { + explicit OpRunner(HandleRequest* hr) : executionContext{hr->executionContext} {} + virtual ~OpRunner() = default; + virtual Future run() = 0; + std::shared_ptr executionContext; + }; - OperationContext* opCtx; - const Message& m; - const ServiceEntryPointCommon::Hooks& behaviors; + HandleRequest(OperationContext* opCtx, + const Message& msg, + std::unique_ptr behaviors) + : executionContext(std::make_shared( + opCtx, const_cast(msg), std::move(behaviors))) {} + + std::unique_ptr makeOpRunner(); - DbMessage dbmsg; + Future startOperation(); + Future completeOperation(); - boost::optional slowMsOverride; - bool forceLog = false; + std::shared_ptr executionContext; }; void generateLegacyQueryErrorResponse(const AssertionException& exception, @@ -253,22 +273,23 @@ void generateLegacyQueryErrorResponse(const AssertionException& exception, response->setData(bb.release()); } -void registerError(OperationContext* opCtx, const DBException& exception) { - LastError::get(opCtx->getClient()).setLastError(exception.code(), exception.reason()); - CurOp::get(opCtx)->debug().errInfo = exception.toStatus(); +void registerError(OperationContext* opCtx, const Status& status) { + LastError::get(opCtx->getClient()).setLastError(status.code(), status.reason()); + CurOp::get(opCtx)->debug().errInfo = status; } void generateErrorResponse(OperationContext* opCtx, rpc::ReplyBuilderInterface* replyBuilder, - const DBException& exception, + const Status& status, const BSONObj& replyMetadata, BSONObj extraFields = {}) { - registerError(opCtx, exception); + invariant(!status.isOK()); + registerError(opCtx, status); // We could have thrown an exception after setting fields in the builder, // so we need to reset it to a clean state just to be sure. replyBuilder->reset(); - replyBuilder->setCommandReply(exception.toStatus(), extraFields); + replyBuilder->setCommandReply(status, extraFields); replyBuilder->getBodyBuilder().appendElements(replyMetadata); } @@ -548,14 +569,248 @@ void appendErrorLabelsAndTopologyVersion(OperationContext* opCtx, topologyVersion.serialize(&topologyVersionBuilder); } -void _abortUnpreparedOrStashPreparedTransaction( - OperationContext* opCtx, TransactionParticipant::Participant* txnParticipant) { - const bool isPrepared = txnParticipant->transactionIsPrepared(); +class ExecCommandDatabase : public std::enable_shared_from_this { +public: + explicit ExecCommandDatabase(std::shared_ptr execContext) + : _execContext(std::move(execContext)) {} + + static Future run(std::shared_ptr execContext) { + return std::make_shared(std::move(execContext))->_makeFutureChain(); + } + + std::shared_ptr getExecutionContext() { + return _execContext; + } + std::shared_ptr getInvocation() { + return _invocation; + } + const OperationSessionInfoFromClient& getSessionOptions() const { + return _sessionOptions; + } + BSONObjBuilder* getExtraFieldsBuilder() { + return &_extraFieldsBuilder; + } + const LogicalTime& getStartOperationTime() const { + return _startOperationTime; + } + + bool isHello() const { + return _execContext->getCommand()->getName() == "hello"_sd || + _execContext->getCommand()->getName() == "isMaster"_sd; + } + +private: + // Returns a future that executes a command after stripping metadata, performing authorization + // checks, handling audit impersonation, and (potentially) setting maintenance mode. The future + // also checks that the command is permissible to run on the node given its current replication + // state. All the logic here is independent of any particular command; any functionality + // relevant to a specific command should be confined to its run() method. + Future _makeFutureChain() { + return _parseCommand().then([this, anchor = shared_from_this()] { + return _initiateCommand() + .then([this] { return _commandExec(); }) + .onError([this, anchor = shared_from_this()](Status status) { + return _handleFailure(std::move(status)); + }); + }); + } + + Future _parseCommand() { + auto pf = makePromiseFuture(); + auto future = std::move(pf.future).then([this, anchor = shared_from_this()] { + auto opCtx = _execContext->getOpCtx(); + auto command = _execContext->getCommand(); + auto& request = _execContext->getRequest(); + + CommandHelpers::uassertShouldAttemptParse(opCtx, command, request); + _startOperationTime = getClientOperationTime(opCtx); + + _invocation = command->parse(opCtx, request); + CommandInvocation::set(opCtx, _invocation); + + const auto session = _execContext->getOpCtx()->getClient()->session(); + if (session) { + if (!opCtx->isExhaust() || !isHello()) { + InExhaustHello::get(session.get()) + ->setInExhaust(false, request.getCommandName()); + } + } + + // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that. + if (isHello()) { + _execContext->slowMsOverride = + 2 * durationCount(SingleServerIsMasterMonitor::kMaxAwaitTime); + } + }); + pf.promise.emplaceValue(); + return future; + } + + // Any logic, such as authorization and auditing, that must precede execution of the command. + Future _initiateCommand(); + + // Returns the future chain that executes the parsed command against the database. + Future _commandExec(); + + // Any error-handling logic that must be performed if the command initiation/execution fails. + void _handleFailure(Status status); + + bool _isInternalClient() const { + return _execContext->session() && + _execContext->session()->getTags() & transport::Session::kInternalClient; + } + + const std::shared_ptr _execContext; + + // The following allows `_initiateCommand`, `_commandExec`, and `_handleFailure` to share + // execution state without concerning the lifetime of these variables. + BSONObjBuilder _extraFieldsBuilder; + std::shared_ptr _invocation; + LogicalTime _startOperationTime; + OperationSessionInfoFromClient _sessionOptions; + std::unique_ptr _scoped; +}; + +class RunCommandImpl : public std::enable_shared_from_this { +public: + explicit RunCommandImpl(std::shared_ptr ecd) + : _ecd(std::move(ecd)), + _shouldCheckOutSession( + _ecd->getSessionOptions().getTxnNumber() && + !shouldCommandSkipSessionCheckout(_ecd->getInvocation()->definition()->getName())), + _shouldWaitForWriteConcern(_ecd->getInvocation()->supportsWriteConcern() || + _ecd->getInvocation()->definition()->getLogicalOp() == + LogicalOp::opGetMore) {} + + static Future run(std::shared_ptr ecd) { + return std::make_shared(std::move(ecd))->_makeFutureChain(); + } + +private: + Future _makeFutureChain(); + + // Anchor for references to attributes defined in `ExecCommandDatabase` (e.g., sessionOptions). + const std::shared_ptr _ecd; + + // Any code that must run before command execution (e.g., reserving bytes for reply builder). + Future _prologue(); + + // Runs the command without waiting for write concern + Future _runCommand(); + + class RunCommandAndWaitForWriteConcern { + public: + explicit RunCommandAndWaitForWriteConcern(std::shared_ptr rci) + : _rci(std::move(rci)), + _execContext(_rci->_ecd->getExecutionContext()), + _oldWriteConcern(_execContext->getOpCtx()->getWriteConcern()) {} + + ~RunCommandAndWaitForWriteConcern() { + _execContext->getOpCtx()->setWriteConcern(_oldWriteConcern); + } + + static Future run(std::shared_ptr); + + private: + void _waitForWriteConcern(BSONObjBuilder& bb); + + void _setup(); + Future _run(); + Future _onRunCompletion(Status); + + const std::shared_ptr _rci; + const std::shared_ptr _execContext; + + // Allows changing the write concern while running the command and resetting on destruction. + const WriteConcernOptions _oldWriteConcern; + boost::optional _lastOpBeforeRun; + boost::optional _extractedWriteConcern; + }; + + // Any code that must run after command execution -- returns true on successful execution. + Future _epilogue(); + + bool _isInternalClient() const { + auto session = _ecd->getExecutionContext()->session(); + return session && session->getTags() & transport::Session::kInternalClient; + } + + // Whether invoking the command requires a session to be checked out. + const bool _shouldCheckOutSession; + + // getMore operations inherit a WriteConcern from their originating cursor. For example, if the + // originating command was an aggregate with a $out and batchSize: 0. Note that if the command + // only performed reads then we will not need to wait at all. + const bool _shouldWaitForWriteConcern; +}; + +// Simplifies the interface for invoking commands and allows asynchronous execution of command +// invocations. +class InvokeCommand : public std::enable_shared_from_this { +public: + explicit InvokeCommand(std::shared_ptr ecd) : _ecd(std::move(ecd)) {} + + Future run(const bool checkoutSession); + +private: + class SessionCheckoutPath; + + Future _runInvocation() noexcept; + + const std::shared_ptr _ecd; +}; + +class InvokeCommand::SessionCheckoutPath + : public std::enable_shared_from_this { +public: + SessionCheckoutPath(std::shared_ptr parent) : _parent(std::move(parent)) {} + + Future run(); + +private: + void _cleanupIncompleteTxn(); + + Future _checkOutSession(); + void _tapError(Status); + Future _commitInvocation(); + + const std::shared_ptr _parent; + + std::unique_ptr _sessionTxnState; + boost::optional _txnParticipant; + boost::optional>> _guard; +}; + +Future InvokeCommand::run(const bool checkoutSession) { + auto [past, present] = makePromiseFuture(); + auto future = std::move(present).then([this, checkoutSession, anchor = shared_from_this()] { + if (checkoutSession) + return std::make_shared(std::move(anchor))->run(); + return _runInvocation(); + }); + past.emplaceValue(); + return future; +} + +Future InvokeCommand::SessionCheckoutPath::run() { + auto anchor = shared_from_this(); + return makeReadyFutureWith([] {}) + .then([this, anchor] { return _checkOutSession(); }) + .then([this, anchor] { + return _parent->_runInvocation().tapError( + [this, anchor](Status status) { return _tapError(std::move(status)); }); + }) + .then([this, anchor] { return _commitInvocation(); }); +} + +void InvokeCommand::SessionCheckoutPath::_cleanupIncompleteTxn() { + auto opCtx = _parent->_ecd->getExecutionContext()->getOpCtx(); + const bool isPrepared = _txnParticipant->transactionIsPrepared(); try { if (isPrepared) - txnParticipant->stashTransactionResources(opCtx); - else if (txnParticipant->transactionIsOpen()) - txnParticipant->abortTransaction(opCtx); + _txnParticipant->stashTransactionResources(opCtx); + else if (_txnParticipant->transactionIsOpen()) + _txnParticipant->abortTransaction(opCtx); } catch (...) { // It is illegal for this to throw so we catch and log this here for diagnosability. LOGV2_FATAL_CONTINUE(21974, @@ -570,27 +825,18 @@ void _abortUnpreparedOrStashPreparedTransaction( } } -void invokeWithNoSession(OperationContext* opCtx, - const OpMsgRequest& request, - CommandInvocation* invocation, - rpc::ReplyBuilderInterface* replyBuilder) { - tenant_migration_donor::migrationConflictHandler( - opCtx, - request.getDatabase(), - [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); }, - replyBuilder); -} +Future InvokeCommand::SessionCheckoutPath::_checkOutSession() { + auto ecd = _parent->_ecd; + auto execContext = ecd->getExecutionContext(); + auto opCtx = execContext->getOpCtx(); + CommandInvocation* invocation = ecd->getInvocation().get(); + const OperationSessionInfoFromClient& sessionOptions = ecd->getSessionOptions(); -void invokeWithSessionCheckedOut(OperationContext* opCtx, - const OpMsgRequest& request, - CommandInvocation* invocation, - const OperationSessionInfoFromClient& sessionOptions, - rpc::ReplyBuilderInterface* replyBuilder) { // This constructor will check out the session. It handles the appropriate state management // for both multi-statement transactions and retryable writes. Currently, only requests with // a transaction number will check out the session. - MongoDOperationContextSession sessionTxnState(opCtx); - auto txnParticipant = TransactionParticipant::get(opCtx); + _sessionTxnState = std::make_unique(opCtx); + _txnParticipant.emplace(TransactionParticipant::get(opCtx)); if (!opCtx->getClient()->isInDirectClient()) { bool beganOrContinuedTxn{false}; @@ -598,13 +844,13 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, // transaction on that session. while (!beganOrContinuedTxn) { try { - txnParticipant.beginOrContinue(opCtx, - *sessionOptions.getTxnNumber(), - sessionOptions.getAutocommit(), - sessionOptions.getStartTransaction()); + _txnParticipant->beginOrContinue(opCtx, + *sessionOptions.getTxnNumber(), + sessionOptions.getAutocommit(), + sessionOptions.getStartTransaction()); beganOrContinuedTxn = true; } catch (const ExceptionFor&) { - auto prepareCompleted = txnParticipant.onExitPrepare(); + auto prepareCompleted = _txnParticipant->onExitPrepare(); CurOpFailpointHelpers::waitWhileFailPointEnabled( &waitAfterNewStatementBlocksBehindPrepare, @@ -636,21 +882,19 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, // transactions on failure to unstash the transaction resources to opCtx. We don't want to // have this error guard for beginOrContinue as it can abort the transaction for any // accidental invalid statements in the transaction. - auto abortOnError = makeGuard([&txnParticipant, opCtx] { - if (txnParticipant.transactionIsInProgress()) { - txnParticipant.abortTransaction(opCtx); + auto abortOnError = makeGuard([&] { + if (_txnParticipant->transactionIsInProgress()) { + _txnParticipant->abortTransaction(opCtx); } }); - txnParticipant.unstashTransactionResources(opCtx, invocation->definition()->getName()); + _txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName()); // Unstash success. abortOnError.dismiss(); } - auto guard = makeGuard([opCtx, &txnParticipant] { - _abortUnpreparedOrStashPreparedTransaction(opCtx, &txnParticipant); - }); + _guard.emplace([this] { _cleanupIncompleteTxn(); }); if (!opCtx->getClient()->isInDirectClient()) { const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); @@ -675,23 +919,31 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, (cmdName == "create"_sd || cmdName == "createIndexes"_sd)) { if (!readConcernSupport.readConcernSupport.isOK()) { uassertStatusOK(readConcernSupport.readConcernSupport.withContext( - str::stream() << "Command " << cmdName - << " does not support this transaction's " - << readConcernArgs.toString())); + "Command {} does not support this transaction's {}"_format( + cmdName, readConcernArgs.toString()))); } } } // Use the API parameters that were stored when the transaction was initiated. - APIParameters::get(opCtx) = txnParticipant.getAPIParameters(opCtx); + APIParameters::get(opCtx) = _txnParticipant->getAPIParameters(opCtx); - try { - tenant_migration_donor::migrationConflictHandler( - opCtx, - request.getDatabase(), - [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); }, - replyBuilder); - } catch (const ExceptionFor&) { + return Status::OK(); +} + +Future InvokeCommand::_runInvocation() noexcept { + auto execContext = _ecd->getExecutionContext(); + return tenant_migration_donor::migrationConflictHandler( + execContext, [execContext, invocation = _ecd->getInvocation()] { + return CommandHelpers::runCommandInvocationAsync(std::move(execContext), + std::move(invocation)); + }); +} + +void InvokeCommand::SessionCheckoutPath::_tapError(Status status) { + auto opCtx = _parent->_ecd->getExecutionContext()->getOpCtx(); + const OperationSessionInfoFromClient& sessionOptions = _parent->_ecd->getSessionOptions(); + if (status.code() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { // Exceptions are used to resolve views in a sharded cluster, so they should be handled // specially to avoid unnecessary aborts. @@ -704,49 +956,47 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, // avoid leaving it orphaned in this case, which is fine even if it is re-targeted // because the retry will include "startTransaction" again and "restart" a transaction // at the active txnNumber. - throw; + return; } // If this shard has completed an earlier statement for this transaction, it must already be // in the transaction's participant list, so it is guaranteed to learn its outcome. - txnParticipant.stashTransactionResources(opCtx); - guard.dismiss(); - throw; - } catch (const ExceptionFor&) { - txnParticipant.stashTransactionResources(opCtx); - txnParticipant.resetRetryableWriteState(opCtx); - guard.dismiss(); - throw; + _txnParticipant->stashTransactionResources(opCtx); + _guard->dismiss(); + } else if (status.code() == ErrorCodes::WouldChangeOwningShard) { + _txnParticipant->stashTransactionResources(opCtx); + _txnParticipant->resetRetryableWriteState(opCtx); + _guard->dismiss(); } +} +Future InvokeCommand::SessionCheckoutPath::_commitInvocation() { + auto execContext = _parent->_ecd->getExecutionContext(); + auto replyBuilder = execContext->getReplyBuilder(); if (auto okField = replyBuilder->getBodyBuilder().asTempObj()["ok"]) { // If ok is present, use its truthiness. if (!okField.trueValue()) { - return; + return Status::OK(); } } // Stash or commit the transaction when the command succeeds. - txnParticipant.stashTransactionResources(opCtx); - guard.dismiss(); + _txnParticipant->stashTransactionResources(execContext->getOpCtx()); + _guard->dismiss(); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer || serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - auto txnResponseMetadata = txnParticipant.getResponseMetadata(); + auto txnResponseMetadata = _txnParticipant->getResponseMetadata(); auto bodyBuilder = replyBuilder->getBodyBuilder(); txnResponseMetadata.serialize(&bodyBuilder); } + return Status::OK(); } -bool runCommandImpl(OperationContext* opCtx, - CommandInvocation* invocation, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* replyBuilder, - LogicalTime startOperationTime, - const ServiceEntryPointCommon::Hooks& behaviors, - BSONObjBuilder* extraFieldsBuilder, - const OperationSessionInfoFromClient& sessionOptions) { - const Command* command = invocation->definition(); +Future RunCommandImpl::_prologue() try { + auto execContext = _ecd->getExecutionContext(); + auto opCtx = execContext->getOpCtx(); + const Command* command = _ecd->getInvocation()->definition(); auto bytesToReserve = command->reserveBytesForReply(); // SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the // additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency @@ -755,19 +1005,7 @@ bool runCommandImpl(OperationContext* opCtx, if (kDebugBuild) bytesToReserve = 0; #endif - replyBuilder->reserveBytes(bytesToReserve); - - const auto isInternalClient = opCtx->getClient()->session() && - (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient); - - const bool shouldCheckOutSession = - sessionOptions.getTxnNumber() && !shouldCommandSkipSessionCheckout(command->getName()); - - // getMore operations inherit a WriteConcern from their originating cursor. For example, if the - // originating command was an aggregate with a $out and batchSize: 0. Note that if the command - // only performed reads then we will not need to wait at all. - const bool shouldWaitForWriteConcern = - invocation->supportsWriteConcern() || command->getLogicalOp() == LogicalOp::opGetMore; + execContext->getReplyBuilder()->reserveBytes(bytesToReserve); // Record readConcern usages for commands run outside of transactions, excluding DBDirectClient. // For commands inside a transaction, they inherit the readConcern from the transaction. So we @@ -777,139 +1015,18 @@ bool runCommandImpl(OperationContext* opCtx, ServerReadConcernMetrics::get(opCtx)->recordReadConcern(repl::ReadConcernArgs::get(opCtx), false /* isTransaction */); } + return Status::OK(); +} catch (const DBException& ex) { + return ex.toStatus(); +} - if (shouldWaitForWriteConcern) { - auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - - // Change the write concern while running the command. - const auto oldWC = opCtx->getWriteConcern(); - ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); - - boost::optional extractedWriteConcern; - if (command->getLogicalOp() == LogicalOp::opGetMore) { - // WriteConcern will be set up during command processing, it must not be specified on - // the command body. - behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); - } else { - // WriteConcern should always be explicitly specified by operations received on shard - // and config servers, even if it is empty (ie. writeConcern: {}). In this context - // (shard/config servers) an empty WC indicates the operation should use the implicit - // server defaults. So, warn if the operation has not specified writeConcern and is on - // a shard/config server. - if (!opCtx->getClient()->isInDirectClient() && - (!opCtx->inMultiDocumentTransaction() || - isTransactionCommand(command->getName()))) { - if (isInternalClient) { - // WriteConcern should always be explicitly specified by operations received - // from internal clients (ie. from a mongos or mongod), even if it is empty - // (ie. writeConcern: {}, which is equivalent to { w: 1, wtimeout: 0 }). - uassert( - 4569201, - "received command without explicit writeConcern on an internalClient connection {}"_format( - redact(request.body.toString())), - request.body.hasField(WriteConcernOptions::kWriteConcernField)); - } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer || - serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - if (!request.body.hasField(WriteConcernOptions::kWriteConcernField)) { - // TODO: Disabled until after SERVER-44539, to avoid log spam. - // LOGV2(21959, "Missing writeConcern on {command}", "Missing " - // "writeConcern on command", "command"_attr = command->getName()); - } - } - } - extractedWriteConcern.emplace( - uassertStatusOK(extractWriteConcern(opCtx, request.body, isInternalClient))); - if (sessionOptions.getAutocommit()) { - validateWriteConcernForTransaction(*extractedWriteConcern, - invocation->definition()->getName()); - } - - // Ensure that the WC being set on the opCtx has provenance. - invariant(extractedWriteConcern->getProvenance().hasSource(), - str::stream() << "unexpected unset provenance on writeConcern: " - << extractedWriteConcern->toBSON()); - - opCtx->setWriteConcern(*extractedWriteConcern); - } - - auto waitForWriteConcern = [&](auto&& bb) { - bool reallyWait = true; - failCommand.executeIf( - [&](const BSONObj& data) { - bb.append(data["writeConcernError"]); - reallyWait = false; - if (data.hasField(kErrorLabelsFieldName) && - data[kErrorLabelsFieldName].type() == Array) { - // Propagate error labels specified in the failCommand failpoint to the - // OperationContext decoration to override getErrorLabels() behaviors. - invariant(!errorLabelsOverride(opCtx)); - errorLabelsOverride(opCtx).emplace( - data.getObjectField(kErrorLabelsFieldName).getOwned()); - } - }, - [&](const BSONObj& data) { - return CommandHelpers::shouldActivateFailCommandFailPoint( - data, invocation, opCtx->getClient()) && - data.hasField("writeConcernError"); - }); - if (reallyWait) { - CurOp::get(opCtx)->debug().writeConcern.emplace(opCtx->getWriteConcern()); - behaviors.waitForWriteConcern(opCtx, invocation, lastOpBeforeRun, bb); - } - }; - - try { - if (auto scoped = failWithErrorCodeInRunCommand.scoped(); - MONGO_unlikely(scoped.isActive())) { - const auto errorCode = scoped.getData()["errorCode"].numberInt(); - LOGV2(21960, - "failWithErrorCodeInRunCommand enabled - failing command with error " - "code: {errorCode}", - "failWithErrorCodeInRunCommand enabled, failing command", - "errorCode"_attr = errorCode); - BSONObjBuilder errorBuilder; - errorBuilder.append("ok", 0.0); - errorBuilder.append("code", errorCode); - errorBuilder.append("errmsg", "failWithErrorCodeInRunCommand enabled."); - replyBuilder->setCommandReply(errorBuilder.obj()); - } else if (shouldCheckOutSession) { - invokeWithSessionCheckedOut( - opCtx, request, invocation, sessionOptions, replyBuilder); - } else { - invokeWithNoSession(opCtx, request, invocation, replyBuilder); - } - } catch (const DBException& ex) { - // Do no-op write before returning NoSuchTransaction if command has writeConcern. - if (ex.toStatus().code() == ErrorCodes::NoSuchTransaction && - !opCtx->getWriteConcern().usedDefault) { - TransactionParticipant::performNoopWrite(opCtx, "NoSuchTransaction error"); - } - waitForWriteConcern(*extraFieldsBuilder); - throw; - } - - waitForWriteConcern(replyBuilder->getBodyBuilder()); - - // With the exception of getMores inheriting the WriteConcern from the originating command, - // nothing in run() should change the writeConcern. - if (command->getLogicalOp() == LogicalOp::opGetMore) { - dassert(!extractedWriteConcern, - "opGetMore contained unexpected extracted write concern"); - } else { - dassert(extractedWriteConcern, "no extracted write concern"); - dassert(opCtx->getWriteConcern() == extractedWriteConcern, - "opCtx wc: {} extracted wc: {}"_format( - opCtx->getWriteConcern().toBSON().jsonString(), - extractedWriteConcern->toBSON().jsonString())); - } - } else { - behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); - if (shouldCheckOutSession) { - invokeWithSessionCheckedOut(opCtx, request, invocation, sessionOptions, replyBuilder); - } else { - invokeWithNoSession(opCtx, request, invocation, replyBuilder); - } - } +Future RunCommandImpl::_epilogue() { + auto execContext = _ecd->getExecutionContext(); + auto opCtx = execContext->getOpCtx(); + auto& request = execContext->getRequest(); + auto command = execContext->getCommand(); + auto replyBuilder = execContext->getReplyBuilder(); + auto& behaviors = *execContext->behaviors; // This fail point blocks all commands which are running on the specified namespace, or which // are present in the given list of commands.If no namespace or command list are provided,then @@ -926,7 +1043,7 @@ bool runCommandImpl(OperationContext* opCtx, // If 'ns' or 'commands' is not set, block for all the namespaces or commands // respectively. - return (ns.empty() || invocation->ns().ns() == ns) && + return (ns.empty() || _ecd->getInvocation()->ns().ns() == ns) && (commands.empty() || std::any_of(commands.begin(), commands.end(), [&request](auto& element) { return element.valueStringDataSafe() == request.getCommandName(); @@ -957,413 +1074,569 @@ bool runCommandImpl(OperationContext* opCtx, if (response.hasField("writeConcernError")) { wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt()); } - appendErrorLabelsAndTopologyVersion( - opCtx, &body, sessionOptions, command->getName(), code, wcCode, isInternalClient); + appendErrorLabelsAndTopologyVersion(opCtx, + &body, + _ecd->getSessionOptions(), + command->getName(), + code, + wcCode, + _isInternalClient()); } auto commandBodyBob = replyBuilder->getBodyBuilder(); behaviors.appendReplyMetadata(opCtx, request, &commandBodyBob); - appendClusterAndOperationTime(opCtx, &commandBodyBob, &commandBodyBob, startOperationTime); + appendClusterAndOperationTime( + opCtx, &commandBodyBob, &commandBodyBob, _ecd->getStartOperationTime()); return ok; } -/** - * Executes a command after stripping metadata, performing authorization checks, - * handling audit impersonation, and (potentially) setting maintenance mode. This method - * also checks that the command is permissible to run on the node given its current - * replication state. All the logic here is independent of any particular command; any - * functionality relevant to a specific command should be confined to its run() method. - */ -void execCommandDatabase(OperationContext* opCtx, - Command* command, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* replyBuilder, - const ServiceEntryPointCommon::Hooks& behaviors) { - CommandHelpers::uassertShouldAttemptParse(opCtx, command, request); - BSONObjBuilder extraFieldsBuilder; - auto startOperationTime = getClientOperationTime(opCtx); +Future RunCommandImpl::_runCommand() { + auto execContext = _ecd->getExecutionContext(); + invariant(!_shouldWaitForWriteConcern); + execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern( + execContext->getRequest().body); + return std::make_shared(_ecd)->run(_shouldCheckOutSession); +} - std::shared_ptr invocation = command->parse(opCtx, request); - CommandInvocation::set(opCtx, invocation); +void RunCommandImpl::RunCommandAndWaitForWriteConcern::_waitForWriteConcern(BSONObjBuilder& bb) { + auto invocation = _rci->_ecd->getInvocation().get(); + auto opCtx = _execContext->getOpCtx(); + if (auto scoped = failCommand.scopedIf([&](const BSONObj& obj) { + return CommandHelpers::shouldActivateFailCommandFailPoint( + obj, invocation, opCtx->getClient()) && + obj.hasField("writeConcernError"); + }); + MONGO_unlikely(scoped.isActive())) { + const BSONObj& data = scoped.getData(); + bb.append(data["writeConcernError"]); + if (data.hasField(kErrorLabelsFieldName) && data[kErrorLabelsFieldName].type() == Array) { + // Propagate error labels specified in the failCommand failpoint to the + // OperationContext decoration to override getErrorLabels() behaviors. + invariant(!errorLabelsOverride(opCtx)); + errorLabelsOverride(opCtx).emplace( + data.getObjectField(kErrorLabelsFieldName).getOwned()); + } + return; + } - OperationSessionInfoFromClient sessionOptions; + CurOp::get(opCtx)->debug().writeConcern.emplace(opCtx->getWriteConcern()); + _execContext->behaviors->waitForWriteConcern(opCtx, invocation, _lastOpBeforeRun.get(), bb); +} - const auto isInternalClient = opCtx->getClient()->session() && - (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient); +Future RunCommandImpl::RunCommandAndWaitForWriteConcern::run( + std::shared_ptr rci) { + auto instance = std::make_shared(std::move(rci)); + // `_setup()` runs inline as part of preparing the future-chain, which will run the command and + // waits for write concern, and may throw. + instance->_setup(); + auto pf = makePromiseFuture(); + auto future = std::move(pf.future) + .then([instance] { return instance->_run(); }) + .onCompletion([instance](Status status) { + return instance->_onRunCompletion(std::move(status)); + }); + pf.promise.emplaceValue(); + return future; +} - const auto isHello = command->getName() == "hello"_sd || command->getName() == "isMaster"_sd; +void RunCommandImpl::RunCommandAndWaitForWriteConcern::_setup() { + auto invocation = _rci->_ecd->getInvocation(); + OperationContext* opCtx = _execContext->getOpCtx(); + const Command* command = invocation->definition(); + const OpMsgRequest& request = _execContext->getRequest(); - try { - const auto apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command); - Client* client = opCtx->getClient(); + _lastOpBeforeRun.emplace(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()); - { - stdx::lock_guard lk(*client); - CurOp::get(opCtx)->setCommand_inlock(command); - APIParameters::get(opCtx) = APIParameters::fromClient(apiParamsFromClient); + if (command->getLogicalOp() == LogicalOp::opGetMore) { + // WriteConcern will be set up during command processing, it must not be specified on + // the command body. + _execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern(request.body); + } else { + // WriteConcern should always be explicitly specified by operations received on shard + // and config servers, even if it is empty (ie. writeConcern: {}). In this context + // (shard/config servers) an empty WC indicates the operation should use the implicit + // server defaults. So, warn if the operation has not specified writeConcern and is on + // a shard/config server. + if (!opCtx->getClient()->isInDirectClient() && + (!opCtx->inMultiDocumentTransaction() || isTransactionCommand(command->getName()))) { + if (_rci->_isInternalClient()) { + // WriteConcern should always be explicitly specified by operations received + // from internal clients (ie. from a mongos or mongod), even if it is empty + // (ie. writeConcern: {}, which is equivalent to { w: 1, wtimeout: 0 }). + uassert( + 4569201, + "received command without explicit writeConcern on an internalClient connection {}"_format( + redact(request.body.toString())), + request.body.hasField(WriteConcernOptions::kWriteConcernField)); + } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer || + serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + if (!request.body.hasField(WriteConcernOptions::kWriteConcernField)) { + // TODO: Disabled until after SERVER-44539, to avoid log spam. + // LOGV2(21959, "Missing writeConcern on {command}", "Missing " + // "writeConcern on command", "command"_attr = command->getName()); + } + } } - - if (isHello) { - // Preload generic ClientMetadata ahead of our first hello request. After the first - // request, metaElement should always be empty. - auto metaElem = request.body[kMetadataDocumentName]; - ClientMetadata::setFromMetadata(opCtx->getClient(), metaElem); + _extractedWriteConcern.emplace( + uassertStatusOK(extractWriteConcern(opCtx, request.body, _rci->_isInternalClient()))); + if (_rci->_ecd->getSessionOptions().getAutocommit()) { + validateWriteConcernForTransaction(*_extractedWriteConcern, + invocation->definition()->getName()); } - auto& apiParams = APIParameters::get(opCtx); - auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext()); - if (auto clientMetadata = ClientMetadata::get(client)) { - auto appName = clientMetadata->getApplicationName().toString(); - apiVersionMetrics.update(appName, apiParams); + // Ensure that the WC being set on the opCtx has provenance. + invariant(_extractedWriteConcern->getProvenance().hasSource(), + fmt::format("unexpected unset provenance on writeConcern: {}", + _extractedWriteConcern->toBSON().jsonString())); + + opCtx->setWriteConcern(*_extractedWriteConcern); + } +} + +Future RunCommandImpl::RunCommandAndWaitForWriteConcern::_run() { + if (auto scoped = failWithErrorCodeInRunCommand.scoped(); MONGO_unlikely(scoped.isActive())) { + const auto errorCode = scoped.getData()["errorCode"].numberInt(); + LOGV2(21960, + "failWithErrorCodeInRunCommand enabled - failing command with error " + "code: {errorCode}", + "failWithErrorCodeInRunCommand enabled, failing command", + "errorCode"_attr = errorCode); + BSONObjBuilder errorBuilder; + errorBuilder.append("ok", 0.0); + errorBuilder.append("code", errorCode); + errorBuilder.append("errmsg", "failWithErrorCodeInRunCommand enabled."); + _execContext->getReplyBuilder()->setCommandReply(errorBuilder.obj()); + return Status::OK(); + } + return std::make_shared(_rci->_ecd)->run(_rci->_shouldCheckOutSession); +} + +Future RunCommandImpl::RunCommandAndWaitForWriteConcern::_onRunCompletion(Status status) { + auto opCtx = _execContext->getOpCtx(); + if (!status.isOK()) { + // Do no-op write before returning NoSuchTransaction if command has writeConcern. + if (status.code() == ErrorCodes::NoSuchTransaction && + !opCtx->getWriteConcern().usedDefault) { + TransactionParticipant::performNoopWrite(opCtx, "NoSuchTransaction error"); } + _waitForWriteConcern(*_rci->_ecd->getExtraFieldsBuilder()); + return status; + } - sleepMillisAfterCommandExecutionBegins.execute([&](const BSONObj& data) { - auto numMillis = data["millis"].numberInt(); - auto commands = data["commands"].Obj().getFieldNames>(); - // Only sleep for one of the specified commands. - if (commands.find(command->getName()) != commands.end()) { - mongo::sleepmillis(numMillis); - } - }); + auto bb = _execContext->getReplyBuilder()->getBodyBuilder(); + _waitForWriteConcern(bb); - rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); - rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName()); + // With the exception of getMores inheriting the WriteConcern from the originating command, + // nothing in run() should change the writeConcern. + if (_execContext->getCommand()->getLogicalOp() == LogicalOp::opGetMore) { + dassert(!_extractedWriteConcern, "opGetMore contained unexpected extracted write concern"); + } else { + dassert(_extractedWriteConcern, "no extracted write concern"); + dassert( + opCtx->getWriteConcern() == _extractedWriteConcern, + "opCtx wc: {} extracted wc: {}"_format(opCtx->getWriteConcern().toBSON().jsonString(), + _extractedWriteConcern->toBSON().jsonString())); + } + return status; +} - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); +Future RunCommandImpl::_makeFutureChain() { + return _prologue() + .then([this] { + if (_shouldWaitForWriteConcern) + return RunCommandAndWaitForWriteConcern::run(shared_from_this()); + else + return _runCommand(); + }) + .then([this] { return _epilogue(); }) + .onCompletion( + [this, anchor = shared_from_this()](StatusWith ranSuccessfully) -> Future { + // Failure to run a command is either indicated by throwing an exception or adding a + // non-okay field to the replyBuilder. The input argument (i.e., `ranSuccessfully`) + // captures both cases. On success, it holds an okay status and a `true` value. + auto status = ranSuccessfully.getStatus(); + if (status.isOK() && ranSuccessfully.getValue()) + return Status::OK(); + + auto execContext = _ecd->getExecutionContext(); + execContext->getCommand()->incrementCommandsFailed(); + if (status.code() == ErrorCodes::Unauthorized) { + CommandHelpers::auditLogAuthEvent(execContext->getOpCtx(), + _ecd->getInvocation().get(), + execContext->getRequest(), + status.code()); + } + return status; + }); +} - sessionOptions = initializeOperationSessionInfo( - opCtx, - request.body, - command->requiresAuth(), - command->attachLogicalSessionsToOpCtx(), - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet); +Future ExecCommandDatabase::_initiateCommand() try { + auto opCtx = _execContext->getOpCtx(); + auto& request = _execContext->getRequest(); + auto command = _execContext->getCommand(); + auto replyBuilder = _execContext->getReplyBuilder(); - CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get()); + const auto apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command); + Client* client = opCtx->getClient(); - const auto dbname = request.getDatabase().toString(); - uassert( - ErrorCodes::InvalidNamespace, - str::stream() << "Invalid database name: '" << dbname << "'", - NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); + { + stdx::lock_guard lk(*client); + CurOp::get(opCtx)->setCommand_inlock(command); + APIParameters::get(opCtx) = APIParameters::fromClient(apiParamsFromClient); + } - ResourceConsumption::ScopedMetricsCollector scopedMetrics( - opCtx, dbname, command->collectsResourceConsumptionMetrics()); - - const auto allowTransactionsOnConfigDatabase = - (serverGlobalParams.clusterRole == ClusterRole::ConfigServer || - serverGlobalParams.clusterRole == ClusterRole::ShardServer); - - validateSessionOptions(sessionOptions, - command->getName(), - invocation->ns(), - allowTransactionsOnConfigDatabase); - - std::unique_ptr mmSetter; - - BSONElement cmdOptionMaxTimeMSField; - BSONElement maxTimeMSOpOnlyField; - BSONElement allowImplicitCollectionCreationField; - BSONElement helpField; - - StringMap topLevelFields; - for (auto&& element : request.body) { - StringData fieldName = element.fieldNameStringData(); - if (fieldName == QueryRequest::cmdOptionMaxTimeMS) { - cmdOptionMaxTimeMSField = element; - } else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) { - uassert(ErrorCodes::InvalidOptions, - "Can not specify maxTimeMSOpOnly for non internal clients", - isInternalClient); - maxTimeMSOpOnlyField = element; - } else if (fieldName == "allowImplicitCollectionCreation") { - allowImplicitCollectionCreationField = element; - } else if (fieldName == CommandHelpers::kHelpFieldName) { - helpField = element; - } else if (fieldName == "comment") { - opCtx->setComment(element.wrap()); - } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) { - uasserted(ErrorCodes::InvalidOptions, - "no such command option $maxTimeMs; use maxTimeMS instead"); - } + if (isHello()) { + // Preload generic ClientMetadata ahead of our first hello request. After the first + // request, metaElement should always be empty. + auto metaElem = request.body[kMetadataDocumentName]; + ClientMetadata::setFromMetadata(opCtx->getClient(), metaElem); + } - uassert(ErrorCodes::FailedToParse, - str::stream() << "Parsed command object contains duplicate top level key: " - << fieldName, - topLevelFields[fieldName]++ == 0); - } + auto& apiParams = APIParameters::get(opCtx); + auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext()); + if (auto clientMetadata = ClientMetadata::get(client)) { + auto appName = clientMetadata->getApplicationName().toString(); + apiVersionMetrics.update(appName, apiParams); + } - if (CommandHelpers::isHelpRequest(helpField)) { - CurOp::get(opCtx)->ensureStarted(); - // We disable last-error for help requests due to SERVER-11492, because config - // servers use help requests to determine which commands are database writes, and so - // must be forwarded to all config servers. - LastError::get(opCtx->getClient()).disable(); - Command::generateHelpResponse(opCtx, replyBuilder, *command); - return; + sleepMillisAfterCommandExecutionBegins.execute([&](const BSONObj& data) { + auto numMillis = data["millis"].numberInt(); + auto commands = data["commands"].Obj().getFieldNames>(); + // Only sleep for one of the specified commands. + if (commands.find(command->getName()) != commands.end()) { + mongo::sleepmillis(numMillis); } + }); - ImpersonationSessionGuard guard(opCtx); - invocation->checkAuthorization(opCtx, request); + rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); + rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName()); - const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!opCtx->getClient()->isInDirectClient() && - !MONGO_unlikely(skipCheckingForNotPrimaryInCommandDispatch.shouldFail())) { - const bool inMultiDocumentTransaction = (sessionOptions.getAutocommit() == false); - auto allowed = command->secondaryAllowed(opCtx->getServiceContext()); - bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways; - bool couldHaveOptedIn = - allowed == Command::AllowedOnSecondary::kOptIn && !inMultiDocumentTransaction; - bool optedIn = - couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary(); - bool canRunHere = commandCanRunHere(opCtx, dbname, command, inMultiDocumentTransaction); - if (!canRunHere && couldHaveOptedIn) { - const auto msg = client->supportsHello() ? "not primary and secondaryOk=false"_sd - : "not master and slaveOk=false"_sd; - uasserted(ErrorCodes::NotPrimaryNoSecondaryOk, msg); - } + _sessionOptions = initializeOperationSessionInfo(opCtx, + request.body, + command->requiresAuth(), + command->attachLogicalSessionsToOpCtx(), + replCoord->getReplicationMode() == + repl::ReplicationCoordinator::modeReplSet); - if (MONGO_unlikely(respondWithNotPrimaryInCommandDispatch.shouldFail())) { - uassert(ErrorCodes::NotWritablePrimary, "not primary", canRunHere); - } else { - const auto msg = client->supportsHello() ? "not primary"_sd : "not master"_sd; - uassert(ErrorCodes::NotWritablePrimary, msg, canRunHere); - } + CommandHelpers::evaluateFailCommandFailPoint(opCtx, _invocation.get()); - if (!command->maintenanceOk() && - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) && - !replCoord->getMemberState().secondary()) { - - uassert(ErrorCodes::NotPrimaryOrSecondary, - "node is recovering", - !replCoord->getMemberState().recovering()); - uassert(ErrorCodes::NotPrimaryOrSecondary, - "node is not in primary or recovering state", - replCoord->getMemberState().primary()); - // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode - uassert(ErrorCodes::NotPrimaryOrSecondary, - "node is in drain mode", - optedIn || alwaysAllowed); - } - } + const auto dbname = request.getDatabase().toString(); + uassert(ErrorCodes::InvalidNamespace, + fmt::format("Invalid database name: '{}'", dbname), + NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); + + ResourceConsumption::ScopedMetricsCollector scopedMetrics( + opCtx, dbname, command->collectsResourceConsumptionMetrics()); + + const auto allowTransactionsOnConfigDatabase = + (serverGlobalParams.clusterRole == ClusterRole::ConfigServer || + serverGlobalParams.clusterRole == ClusterRole::ShardServer); - if (command->adminOnly()) { - LOGV2_DEBUG(21961, - 2, - "Admin only command: {command}", - "Admin only command", - "command"_attr = request.getCommandName()); + validateSessionOptions( + _sessionOptions, command->getName(), _invocation->ns(), allowTransactionsOnConfigDatabase); + + std::unique_ptr mmSetter; + + BSONElement cmdOptionMaxTimeMSField; + BSONElement maxTimeMSOpOnlyField; + BSONElement allowImplicitCollectionCreationField; + BSONElement helpField; + + StringMap topLevelFields; + for (auto&& element : request.body) { + StringData fieldName = element.fieldNameStringData(); + if (fieldName == QueryRequest::cmdOptionMaxTimeMS) { + cmdOptionMaxTimeMSField = element; + } else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) { + uassert(ErrorCodes::InvalidOptions, + "Can not specify maxTimeMSOpOnly for non internal clients", + _isInternalClient()); + maxTimeMSOpOnlyField = element; + } else if (fieldName == "allowImplicitCollectionCreation") { + allowImplicitCollectionCreationField = element; + } else if (fieldName == CommandHelpers::kHelpFieldName) { + helpField = element; + } else if (fieldName == "comment") { + opCtx->setComment(element.wrap()); + } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) { + uasserted(ErrorCodes::InvalidOptions, + "no such command option $maxTimeMs; use maxTimeMS instead"); } - if (command->maintenanceMode()) { - mmSetter.reset(new MaintenanceModeSetter(opCtx)); + uassert(ErrorCodes::FailedToParse, + str::stream() << "Parsed command object contains duplicate top level key: " + << fieldName, + topLevelFields[fieldName]++ == 0); + } + + if (CommandHelpers::isHelpRequest(helpField)) { + CurOp::get(opCtx)->ensureStarted(); + // We disable last-error for help requests due to SERVER-11492, because config servers use + // help requests to determine which commands are database writes, and so must be forwarded + // to all config servers. + LastError::get(opCtx->getClient()).disable(); + Command::generateHelpResponse(opCtx, replyBuilder, *command); + return Status(ErrorCodes::SkipCommandExecution, + "Skipping command execution for help request"); + } + + ImpersonationSessionGuard guard(opCtx); + _invocation->checkAuthorization(opCtx, request); + + const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); + + if (!opCtx->getClient()->isInDirectClient() && + !MONGO_unlikely(skipCheckingForNotPrimaryInCommandDispatch.shouldFail())) { + const bool inMultiDocumentTransaction = (_sessionOptions.getAutocommit() == false); + auto allowed = command->secondaryAllowed(opCtx->getServiceContext()); + bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways; + bool couldHaveOptedIn = + allowed == Command::AllowedOnSecondary::kOptIn && !inMultiDocumentTransaction; + bool optedIn = couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary(); + bool canRunHere = commandCanRunHere(opCtx, dbname, command, inMultiDocumentTransaction); + if (!canRunHere && couldHaveOptedIn) { + const auto msg = client->supportsHello() ? "not primary and secondaryOk=false"_sd + : "not master and slaveOk=false"_sd; + uasserted(ErrorCodes::NotPrimaryNoSecondaryOk, msg); } - if (command->shouldAffectCommandCounter()) { - OpCounters* opCounters = &globalOpCounters; - opCounters->gotCommand(); + if (MONGO_unlikely(respondWithNotPrimaryInCommandDispatch.shouldFail())) { + uassert(ErrorCodes::NotWritablePrimary, "not primary", canRunHere); + } else { + const auto msg = client->supportsHello() ? "not primary"_sd : "not master"_sd; + uassert(ErrorCodes::NotWritablePrimary, msg, canRunHere); } - // Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation - // on the OperationContext. The 'maxTimeMS' option unfortunately has a different meaning - // for a getMore command, where it is used to communicate the maximum time to wait for - // new inserts on tailable cursors, not as a deadline for the operation. - // TODO SERVER-34277 Remove the special handling for maxTimeMS for getMores. This will - // require introducing a new 'max await time' parameter for getMore, and eventually - // banning maxTimeMS altogether on a getMore command. - int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField)); - int maxTimeMSOpOnly = uassertStatusOK(QueryRequest::parseMaxTimeMS(maxTimeMSOpOnlyField)); - - // The "hello" or "isMaster" commands should not inherit the deadline from the user op it is - // operating as a part of as that can interfere with replica set monitoring and host - // selection. - const bool ignoreMaxTimeMSOpOnly = isHello; - - if ((maxTimeMS > 0 || maxTimeMSOpOnly > 0) && - command->getLogicalOp() != LogicalOp::opGetMore) { - uassert(40119, - "Illegal attempt to set operation deadline within DBDirectClient", - !opCtx->getClient()->isInDirectClient()); - if (!ignoreMaxTimeMSOpOnly && maxTimeMSOpOnly > 0 && - (maxTimeMS == 0 || maxTimeMSOpOnly < maxTimeMS)) { - opCtx->storeMaxTimeMS(Milliseconds{maxTimeMS}); - opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMSOpOnly}, - ErrorCodes::MaxTimeMSExpired); - } else if (maxTimeMS > 0) { - opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired); - } + if (!command->maintenanceOk() && + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && + !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) && + !replCoord->getMemberState().secondary()) { + + uassert(ErrorCodes::NotPrimaryOrSecondary, + "node is recovering", + !replCoord->getMemberState().recovering()); + uassert(ErrorCodes::NotPrimaryOrSecondary, + "node is not in primary or recovering state", + replCoord->getMemberState().primary()); + // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode + uassert(ErrorCodes::NotPrimaryOrSecondary, + "node is in drain mode", + optedIn || alwaysAllowed); } + } - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + if (command->adminOnly()) { + LOGV2_DEBUG(21961, + 2, + "Admin only command: {command}", + "Admin only command", + "command"_attr = request.getCommandName()); + } - // If the parent operation runs in a transaction, we don't override the read concern. - auto skipReadConcern = - opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction(); - bool startTransaction = static_cast(sessionOptions.getStartTransaction()); - if (!skipReadConcern) { - auto newReadConcernArgs = uassertStatusOK(_extractReadConcern( - opCtx, invocation.get(), request.body, startTransaction, isInternalClient)); + if (command->maintenanceMode()) { + mmSetter.reset(new MaintenanceModeSetter(opCtx)); + } - // Ensure that the RC being set on the opCtx has provenance. - invariant(newReadConcernArgs.getProvenance().hasSource(), - str::stream() << "unexpected unset provenance on readConcern: " - << newReadConcernArgs.toBSONInner()); + if (command->shouldAffectCommandCounter()) { + OpCounters* opCounters = &globalOpCounters; + opCounters->gotCommand(); + } - uassert(ErrorCodes::InvalidOptions, - "Only the first command in a transaction may specify a readConcern", - startTransaction || !opCtx->inMultiDocumentTransaction() || - newReadConcernArgs.isEmpty()); - - { - // We must obtain the client lock to set the ReadConcernArgs on the operation - // context as it may be concurrently read by CurrentOp. - stdx::lock_guard lk(*opCtx->getClient()); - readConcernArgs = std::move(newReadConcernArgs); - } + // Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation on the + // OperationContext. The 'maxTimeMS' option unfortunately has a different meaning for a getMore + // command, where it is used to communicate the maximum time to wait for new inserts on tailable + // cursors, not as a deadline for the operation. + // TODO SERVER-34277 Remove the special handling for maxTimeMS for getMores. This will require + // introducing a new 'max await time' parameter for getMore, and eventually banning maxTimeMS + // altogether on a getMore command. + int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField)); + int maxTimeMSOpOnly = uassertStatusOK(QueryRequest::parseMaxTimeMS(maxTimeMSOpOnlyField)); + + // The "hello" command should not inherit the deadline from the user op it is operating as a + // part of as that can interfere with replica set monitoring and host selection. + bool ignoreMaxTimeMSOpOnly = isHello(); + + if ((maxTimeMS > 0 || maxTimeMSOpOnly > 0) && command->getLogicalOp() != LogicalOp::opGetMore) { + uassert(40119, + "Illegal attempt to set operation deadline within DBDirectClient", + !opCtx->getClient()->isInDirectClient()); + if (!ignoreMaxTimeMSOpOnly && maxTimeMSOpOnly > 0 && + (maxTimeMS == 0 || maxTimeMSOpOnly < maxTimeMS)) { + opCtx->storeMaxTimeMS(Milliseconds{maxTimeMS}); + opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMSOpOnly}, + ErrorCodes::MaxTimeMSExpired); + } else if (maxTimeMS > 0) { + opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired); } + } - if (startTransaction) { - opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true); - opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); - } + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + + // If the parent operation runs in a transaction, we don't override the read concern. + auto skipReadConcern = + opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction(); + bool startTransaction = static_cast(_sessionOptions.getStartTransaction()); + if (!skipReadConcern) { + auto newReadConcernArgs = uassertStatusOK(_extractReadConcern( + opCtx, _invocation.get(), request.body, startTransaction, _isInternalClient())); + + // Ensure that the RC being set on the opCtx has provenance. + invariant(newReadConcernArgs.getProvenance().hasSource(), + str::stream() << "unexpected unset provenance on readConcern: " + << newReadConcernArgs.toBSONInner()); - if (opCtx->inMultiDocumentTransaction() && !startTransaction) { - uassert(4937700, - "API parameters are only allowed in the first command of a multi-document " - "transaction", - !APIParameters::get(opCtx).getParamsPassed()); + uassert(ErrorCodes::InvalidOptions, + "Only the first command in a transaction may specify a readConcern", + startTransaction || !opCtx->inMultiDocumentTransaction() || + newReadConcernArgs.isEmpty()); + + { + // We must obtain the client lock to set the ReadConcernArgs on the operation context as + // it may be concurrently read by CurrentOp. + stdx::lock_guard lk(*opCtx->getClient()); + readConcernArgs = std::move(newReadConcernArgs); } + } - // Remember whether or not this operation is starting a transaction, in case something - // later in the execution needs to adjust its behavior based on this. - opCtx->setIsStartingMultiDocumentTransaction(startTransaction); + if (startTransaction) { + opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true); + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + } - auto& oss = OperationShardingState::get(opCtx); + if (opCtx->inMultiDocumentTransaction() && !startTransaction) { + uassert(4937700, + "API parameters are only allowed in the first command of a multi-document " + "transaction", + !APIParameters::get(opCtx).getParamsPassed()); + } - if (!opCtx->getClient()->isInDirectClient() && - readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && - (iAmPrimary || - (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { - oss.initializeClientRoutingVersionsFromCommand(invocation->ns(), request.body); - - auto const shardingState = ShardingState::get(opCtx); - if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) { - uassertStatusOK(shardingState->canAcceptShardedCommands()); - } + // Remember whether or not this operation is starting a transaction, in case something later in + // the execution needs to adjust its behavior based on this. + opCtx->setIsStartingMultiDocumentTransaction(startTransaction); - behaviors.advanceConfigOpTimeFromRequestMetadata(opCtx); + auto& oss = OperationShardingState::get(opCtx); + + if (!opCtx->getClient()->isInDirectClient() && + readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && + (iAmPrimary || (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { + oss.initializeClientRoutingVersionsFromCommand(_invocation->ns(), request.body); + + auto const shardingState = ShardingState::get(opCtx); + if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) { + uassertStatusOK(shardingState->canAcceptShardedCommands()); } - oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField); - auto scoped = behaviors.scopedOperationCompletionShardingActions(opCtx); + _execContext->behaviors->advanceConfigOpTimeFromRequestMetadata(opCtx); + } - // This may trigger the maxTimeAlwaysTimeOut failpoint. - auto status = opCtx->checkForInterruptNoAssert(); + oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField); + _scoped = _execContext->behaviors->scopedOperationCompletionShardingActions(opCtx); - // We still proceed if the primary stepped down, but accept other kinds of - // interruptions. We defer to individual commands to allow themselves to be - // interruptible by stepdowns, since commands like 'voteRequest' should conversely - // continue executing. - if (status != ErrorCodes::PrimarySteppedDown && - status != ErrorCodes::InterruptedDueToReplStateChange) { - uassertStatusOK(status); - } + // This may trigger the maxTimeAlwaysTimeOut failpoint. + auto status = opCtx->checkForInterruptNoAssert(); + // We still proceed if the primary stepped down, but accept other kinds of interruptions. We + // defer to individual commands to allow themselves to be interruptible by stepdowns, since + // commands like 'voteRequest' should conversely continue executing. + if (status != ErrorCodes::PrimarySteppedDown && + status != ErrorCodes::InterruptedDueToReplStateChange) { + uassertStatusOK(status); + } - CurOp::get(opCtx)->ensureStarted(); + CurOp::get(opCtx)->ensureStarted(); - command->incrementCommandsExecuted(); - - if (shouldLog(logv2::LogComponent::kTracking, logv2::LogSeverity::Debug(1)) && - rpc::TrackingMetadata::get(opCtx).getParentOperId()) { - LOGV2_DEBUG_OPTIONS(4615605, - 1, - {logv2::LogComponent::kTracking}, - "Command metadata: {trackingMetadata}", - "Command metadata", - "trackingMetadata"_attr = rpc::TrackingMetadata::get(opCtx)); - rpc::TrackingMetadata::get(opCtx).setIsLogged(true); - } + command->incrementCommandsExecuted(); - behaviors.waitForReadConcern(opCtx, invocation.get(), request); - behaviors.setPrepareConflictBehaviorForReadConcern(opCtx, invocation.get()); - - try { - if (!runCommandImpl(opCtx, - invocation.get(), - request, - replyBuilder, - startOperationTime, - behaviors, - &extraFieldsBuilder, - sessionOptions)) { - command->incrementCommandsFailed(); - } - } catch (const DBException& e) { - command->incrementCommandsFailed(); - if (e.code() == ErrorCodes::Unauthorized) { - CommandHelpers::auditLogAuthEvent(opCtx, invocation.get(), request, e.code()); - } - throw; - } - } catch (const DBException& e) { - behaviors.handleException(e, opCtx); + if (shouldLog(logv2::LogComponent::kTracking, logv2::LogSeverity::Debug(1)) && + rpc::TrackingMetadata::get(opCtx).getParentOperId()) { + LOGV2_DEBUG_OPTIONS(4615605, + 1, + {logv2::LogComponent::kTracking}, + "Command metadata: {trackingMetadata}", + "Command metadata", + "trackingMetadata"_attr = rpc::TrackingMetadata::get(opCtx)); + rpc::TrackingMetadata::get(opCtx).setIsLogged(true); + } - // Append the error labels for transient transaction errors. - auto response = extraFieldsBuilder.asTempObj(); - boost::optional wcCode; - if (response.hasField("writeConcernError")) { - wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt()); - } - appendErrorLabelsAndTopologyVersion(opCtx, - &extraFieldsBuilder, - sessionOptions, - command->getName(), - e.code(), - wcCode, - isInternalClient); + _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request); + _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get()); + return Status::OK(); +} catch (const DBException& ex) { + return ex.toStatus(); +} - BSONObjBuilder metadataBob; - behaviors.appendReplyMetadata(opCtx, request, &metadataBob); +Future ExecCommandDatabase::_commandExec() { + return RunCommandImpl::run(shared_from_this()); +} - // The read concern may not have yet been placed on the operation context, so attempt to - // parse it here, so if it is valid it can be used to compute the proper operationTime. - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (readConcernArgs.isEmpty()) { - auto readConcernArgsStatus = _extractReadConcern(opCtx, - invocation.get(), - request.body, - false /*startTransaction*/, - isInternalClient); - if (readConcernArgsStatus.isOK()) { - // We must obtain the client lock to set the ReadConcernArgs on the operation - // context as it may be concurrently read by CurrentOp. - stdx::lock_guard lk(*opCtx->getClient()); - readConcernArgs = readConcernArgsStatus.getValue(); - } - } - appendClusterAndOperationTime(opCtx, &extraFieldsBuilder, &metadataBob, startOperationTime); +void ExecCommandDatabase::_handleFailure(Status status) { + // Absorb the exception as the command execution has already been skipped. + if (status.code() == ErrorCodes::SkipCommandExecution) + return; - LOGV2_DEBUG(21962, - 1, - "Assertion while executing command '{command}' on database '{db}' with " - "arguments '{commandArgs}': {error}", - "Assertion while executing command", - "command"_attr = request.getCommandName(), - "db"_attr = request.getDatabase(), - "commandArgs"_attr = redact( - ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)), - "error"_attr = redact(e.toString())); + auto opCtx = _execContext->getOpCtx(); + auto& request = _execContext->getRequest(); + auto command = _execContext->getCommand(); + auto replyBuilder = _execContext->getReplyBuilder(); + const auto& behaviors = *_execContext->behaviors; - generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), extraFieldsBuilder.obj()); + behaviors.handleException(status, opCtx); - if (ErrorCodes::isA(e.code())) { - // Rethrow the exception to the top to signal that the client connection should be - // closed. - throw; + // Append the error labels for transient transaction errors. + auto response = _extraFieldsBuilder.asTempObj(); + boost::optional wcCode; + if (response.hasField("writeConcernError")) { + wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt()); + } + appendErrorLabelsAndTopologyVersion(opCtx, + &_extraFieldsBuilder, + _sessionOptions, + command->getName(), + status.code(), + wcCode, + _isInternalClient()); + + BSONObjBuilder metadataBob; + behaviors.appendReplyMetadata(opCtx, request, &metadataBob); + + // The read concern may not have yet been placed on the operation context, so attempt to parse + // it here, so if it is valid it can be used to compute the proper operationTime. + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + if (readConcernArgs.isEmpty()) { + auto readConcernArgsStatus = _extractReadConcern(opCtx, + _invocation.get(), + request.body, + false /*startTransaction*/, + _isInternalClient()); + if (readConcernArgsStatus.isOK()) { + // We must obtain the client lock to set the ReadConcernArgs on the operation context as + // it may be concurrently read by CurrentOp. + stdx::lock_guard lk(*opCtx->getClient()); + readConcernArgs = readConcernArgsStatus.getValue(); } } + appendClusterAndOperationTime(opCtx, &_extraFieldsBuilder, &metadataBob, _startOperationTime); + + LOGV2_DEBUG(21962, + 1, + "Assertion while executing command '{command}' on database '{db}' with " + "arguments '{commandArgs}': {error}", + "Assertion while executing command", + "command"_attr = request.getCommandName(), + "db"_attr = request.getDatabase(), + "commandArgs"_attr = redact( + ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)), + "error"_attr = redact(status.toString())); + + generateErrorResponse( + opCtx, replyBuilder, status, metadataBob.obj(), _extraFieldsBuilder.obj()); + + if (ErrorCodes::isA(status.code())) { + // Rethrow the exception to the top to signal that the client connection should be closed. + internalAssert(status); + } } /** @@ -1383,114 +1656,94 @@ void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) { curop->setNS_inlock(nss.ns()); } -DbResponse receivedCommands(OperationContext* opCtx, - const Message& message, - const ServiceEntryPointCommon::Hooks& behaviors) { - auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); - OpMsgRequest request; - Command* c = nullptr; - [&] { - try { // Parse. - request = rpc::opMsgRequestFromAnyProtocol(message); - } catch (const DBException& ex) { - // If this error needs to fail the connection, propagate it out. - if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) - throw; - - BSONObjBuilder metadataBob; - behaviors.appendReplyMetadataOnError(opCtx, &metadataBob); - - BSONObjBuilder extraFieldsBuilder; - appendClusterAndOperationTime( - opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized); - - // Otherwise, reply with the parse error. This is useful for cases where parsing fails - // due to user-supplied input, such as the document too deep error. Since we failed - // during parsing, we can't log anything about the command. - LOGV2_DEBUG(21963, - 1, - "Assertion while parsing command: {error}", - "Assertion while parsing command", - "error"_attr = ex.toString()); - - generateErrorResponse( - opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj()); - - return; // From lambda. Don't try executing if parsing failed. - } - - try { // Execute. - curOpCommandSetup(opCtx, request); - - // In the absence of a Command object, no redaction is possible. Therefore - // to avoid displaying potentially sensitive information in the logs, - // we restrict the log message to the name of the unrecognized command. - // However, the complete command object will still be echoed to the client. - if (!(c = CommandHelpers::findCommand(request.getCommandName()))) { - globalCommandRegistry()->incrementUnknownCommands(); - std::string msg = str::stream() - << "no such command: '" << request.getCommandName() << "'"; - LOGV2_DEBUG(21964, - 2, - "No such command: {command}", - "Command not found in registry", - "command"_attr = request.getCommandName()); - uasserted(ErrorCodes::CommandNotFound, str::stream() << msg); - } - - LOGV2_DEBUG(21965, - 2, - "Run command {db}.$cmd {commandArgs}", - "About to run the command", - "db"_attr = request.getDatabase(), - "commandArgs"_attr = redact( - ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body))); - - { - // Try to set this as early as possible, as soon as we have figured out the - // command. - stdx::lock_guard lk(*opCtx->getClient()); - CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); - } - - opCtx->setExhaust(OpMsg::isFlagSet(message, OpMsg::kExhaustSupported)); +Future parseCommand(std::shared_ptr execContext) try { + execContext->setRequest(rpc::opMsgRequestFromAnyProtocol(execContext->getMessage())); + return Status::OK(); +} catch (const DBException& ex) { + // Need to set request as `makeCommandResponse` expects an empty request on failure. + execContext->setRequest({}); + + // Otherwise, reply with the parse error. This is useful for cases where parsing fails due to + // user-supplied input, such as the document too deep error. Since we failed during parsing, we + // can't log anything about the command. + LOGV2_DEBUG(21963, + 1, + "Assertion while parsing command: {error}", + "Assertion while parsing command", + "error"_attr = ex.toString()); + + return ex.toStatus(); +} - const auto session = opCtx->getClient()->session(); - if (session) { - if (!opCtx->isExhaust() || - (c->getName() != "hello"_sd && c->getName() != "isMaster"_sd)) { - InExhaustHello::get(session.get()) - ->setInExhaust(false, request.getCommandName()); +Future executeCommand(std::shared_ptr execContext) { + auto [past, present] = makePromiseFuture(); + auto future = + std::move(present) + .then([execContext]() -> Future { + // Prepare environment for command execution (e.g., find command object in registry) + auto opCtx = execContext->getOpCtx(); + auto& request = execContext->getRequest(); + curOpCommandSetup(opCtx, request); + + // In the absence of a Command object, no redaction is possible. Therefore to avoid + // displaying potentially sensitive information in the logs, we restrict the log + // message to the name of the unrecognized command. However, the complete command + // object will still be echoed to the client. + if (execContext->setCommand(CommandHelpers::findCommand(request.getCommandName())); + !execContext->getCommand()) { + globalCommandRegistry()->incrementUnknownCommands(); + LOGV2_DEBUG(21964, + 2, + "No such command: {command}", + "Command not found in registry", + "command"_attr = request.getCommandName()); + return Status(ErrorCodes::CommandNotFound, + fmt::format("no such command: '{}'", request.getCommandName())); } - } - execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors); - } catch (const DBException& ex) { - BSONObjBuilder metadataBob; - behaviors.appendReplyMetadataOnError(opCtx, &metadataBob); + Command* c = execContext->getCommand(); + LOGV2_DEBUG( + 21965, + 2, + "Run command {db}.$cmd {commandArgs}", + "About to run the command", + "db"_attr = request.getDatabase(), + "commandArgs"_attr = redact( + ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body))); - BSONObjBuilder extraFieldsBuilder; - appendClusterAndOperationTime( - opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized); + { + // Try to set this as early as possible, as soon as we have figured out the + // command. + stdx::lock_guard lk(*opCtx->getClient()); + CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); + } - LOGV2_DEBUG(21966, - 1, - "Assertion while executing command '{command}' on database '{db}': {error}", - "Assertion while executing command", - "command"_attr = request.getCommandName(), - "db"_attr = request.getDatabase(), - "error"_attr = ex.toString()); + opCtx->setExhaust( + OpMsg::isFlagSet(execContext->getMessage(), OpMsg::kExhaustSupported)); - generateErrorResponse( - opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj()); + return Status::OK(); + }) + .then([execContext] { return ExecCommandDatabase::run(std::move(execContext)); }) + .tapError([execContext](Status status) { + LOGV2_DEBUG( + 21966, + 1, + "Assertion while executing command '{command}' on database '{db}': {error}", + "Assertion while executing command", + "command"_attr = execContext->getRequest().getCommandName(), + "db"_attr = execContext->getRequest().getDatabase(), + "error"_attr = status.toString()); + }); + past.emplaceValue(); + return future; +} - if (ErrorCodes::isA(ex.code())) { - // Rethrow the exception to the top to signal that the client connection should be - // closed. - throw; - } - } - }(); +DbResponse makeCommandResponse(std::shared_ptr execContext) { + auto opCtx = execContext->getOpCtx(); + const Message& message = execContext->getMessage(); + OpMsgRequest request = execContext->getRequest(); + const Command* c = execContext->getCommand(); + auto replyBuilder = execContext->getReplyBuilder(); if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) { // Close the connection to get client to go through server selection again. @@ -1524,6 +1777,38 @@ DbResponse receivedCommands(OperationContext* opCtx, return dbResponse; } +Future receivedCommands(std::shared_ptr execContext) { + execContext->setReplyBuilder( + rpc::makeReplyBuilder(rpc::protocolForMessage(execContext->getMessage()))); + return parseCommand(execContext) + .then([execContext]() { return executeCommand(std::move(execContext)); }) + .onError([execContext](Status status) { + if (ErrorCodes::isConnectionFatalMessageParseError(status.code())) { + // If this error needs to fail the connection, propagate it out. + internalAssert(status); + } + + auto opCtx = execContext->getOpCtx(); + BSONObjBuilder metadataBob; + execContext->behaviors->appendReplyMetadataOnError(opCtx, &metadataBob); + + BSONObjBuilder extraFieldsBuilder; + appendClusterAndOperationTime( + opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized); + + auto replyBuilder = execContext->getReplyBuilder(); + generateErrorResponse( + opCtx, replyBuilder, status, metadataBob.obj(), extraFieldsBuilder.obj()); + + if (ErrorCodes::isA(status.code())) { + // Return the exception to the top to signal that the client connection should be + // closed. + internalAssert(status); + } + }) + .then([execContext] { return makeCommandResponse(std::move(execContext)); }); +} + DbResponse receivedQuery(OperationContext* opCtx, const NamespaceString& nss, Client& c, @@ -1551,7 +1836,7 @@ DbResponse receivedQuery(OperationContext* opCtx, dbResponse.shouldRunAgainForExhaust = runQuery(opCtx, q, nss, dbResponse.response); } catch (const AssertionException& e) { - behaviors.handleException(e, opCtx); + behaviors.handleException(e.toStatus(), opCtx); dbResponse.response.reset(); generateLegacyQueryErrorResponse(e, q, &op, &dbResponse.response); @@ -1742,32 +2027,40 @@ DbResponse receivedGetMore(OperationContext* opCtx, struct CommandOpRunner : HandleRequest::OpRunner { using HandleRequest::OpRunner::OpRunner; - DbResponse run() override { - DbResponse r = receivedCommands(hr->opCtx, hr->m, hr->behaviors); - // The hello/isMaster commands should take kMaxAwaitTimeMs at most, log if it takes twice - // that. - if (auto command = hr->currentOp().getCommand(); - command && (command->getName() == "hello" || command->getName() == "isMaster")) { - hr->slowMsOverride = - 2 * durationCount(SingleServerIsMasterMonitor::kMaxAwaitTime); - } - return r; + Future run() override { + return receivedCommands(executionContext); } }; -struct QueryOpRunner : HandleRequest::OpRunner { +// Allows wrapping synchronous code in futures without repeating the try-catch block. +struct SynchronousOpRunner : HandleRequest::OpRunner { using HandleRequest::OpRunner::OpRunner; - DbResponse run() override { - hr->opCtx->markKillOnClientDisconnect(); - return receivedQuery( - hr->opCtx, hr->nsString(), *hr->opCtx->getClient(), hr->m, hr->behaviors); + virtual DbResponse runSync() = 0; + Future run() final try { return runSync(); } catch (const DBException& ex) { + return ex.toStatus(); } }; -struct GetMoreOpRunner : HandleRequest::OpRunner { - using HandleRequest::OpRunner::OpRunner; - DbResponse run() override { - return receivedGetMore(hr->opCtx, hr->m, hr->currentOp(), &hr->forceLog); +struct QueryOpRunner : SynchronousOpRunner { + using SynchronousOpRunner::SynchronousOpRunner; + DbResponse runSync() override { + auto opCtx = executionContext->getOpCtx(); + opCtx->markKillOnClientDisconnect(); + return receivedQuery(opCtx, + executionContext->nsString(), + executionContext->client(), + executionContext->getMessage(), + *executionContext->behaviors); + } +}; + +struct GetMoreOpRunner : SynchronousOpRunner { + using SynchronousOpRunner::SynchronousOpRunner; + DbResponse runSync() override { + return receivedGetMore(executionContext->getOpCtx(), + executionContext->getMessage(), + executionContext->currentOp(), + &executionContext->forceLog); } }; @@ -1777,49 +2070,70 @@ struct GetMoreOpRunner : HandleRequest::OpRunner { * class provides a `run` that calls it and handles error reporting * via the `LastError` slot. */ -struct FireAndForgetOpRunner : HandleRequest::OpRunner { - using HandleRequest::OpRunner::OpRunner; +struct FireAndForgetOpRunner : SynchronousOpRunner { + using SynchronousOpRunner::SynchronousOpRunner; virtual void runAndForget() = 0; - DbResponse run() final; + DbResponse runSync() final; }; struct KillCursorsOpRunner : FireAndForgetOpRunner { using FireAndForgetOpRunner::FireAndForgetOpRunner; void runAndForget() override { - hr->currentOp().ensureStarted(); - hr->slowMsOverride = 10; - receivedKillCursors(hr->opCtx, hr->m); + executionContext->currentOp().ensureStarted(); + executionContext->slowMsOverride = 10; + receivedKillCursors(executionContext->getOpCtx(), executionContext->getMessage()); } }; struct InsertOpRunner : FireAndForgetOpRunner { using FireAndForgetOpRunner::FireAndForgetOpRunner; void runAndForget() override { - hr->assertValidNsString(); - receivedInsert(hr->opCtx, hr->nsString(), hr->m); + executionContext->assertValidNsString(); + receivedInsert(executionContext->getOpCtx(), + executionContext->nsString(), + executionContext->getMessage()); } }; struct UpdateOpRunner : FireAndForgetOpRunner { using FireAndForgetOpRunner::FireAndForgetOpRunner; void runAndForget() override { - hr->assertValidNsString(); - receivedUpdate(hr->opCtx, hr->nsString(), hr->m); + executionContext->assertValidNsString(); + receivedUpdate(executionContext->getOpCtx(), + executionContext->nsString(), + executionContext->getMessage()); } }; struct DeleteOpRunner : FireAndForgetOpRunner { using FireAndForgetOpRunner::FireAndForgetOpRunner; void runAndForget() override { - hr->assertValidNsString(); - receivedDelete(hr->opCtx, hr->nsString(), hr->m); + executionContext->assertValidNsString(); + receivedDelete(executionContext->getOpCtx(), + executionContext->nsString(), + executionContext->getMessage()); + } +}; + +struct UnsupportedOpRunner : SynchronousOpRunner { + using SynchronousOpRunner::SynchronousOpRunner; + DbResponse runSync() override { + // For compatibility reasons, we only log incidents of receiving operations that are not + // supported and return an empty response to the caller. + LOGV2(21968, + "Operation isn't supported: {operation}", + "Operation is not supported", + "operation"_attr = static_cast(executionContext->op())); + executionContext->currentOp().done(); + executionContext->forceLog = true; + return {}; } }; std::unique_ptr HandleRequest::makeOpRunner() { - switch (op()) { + switch (executionContext->op()) { case dbQuery: - if (!nsString().isCommand()) + if (!executionContext->nsString().isCommand()) return std::make_unique(this); // FALLTHROUGH: it's a query containing a command case dbMsg: @@ -1835,96 +2149,93 @@ std::unique_ptr HandleRequest::makeOpRunner() { case dbDelete: return std::make_unique(this); default: - LOGV2(21968, - "Operation isn't supported: {operation}", - "Operation is not supported", - "operation"_attr = static_cast(op())); - return nullptr; + return std::make_unique(this); } } -DbResponse FireAndForgetOpRunner::run() { +DbResponse FireAndForgetOpRunner::runSync() { try { runAndForget(); } catch (const AssertionException& ue) { - LastError::get(hr->client()).setLastError(ue.code(), ue.reason()); + LastError::get(executionContext->client()).setLastError(ue.code(), ue.reason()); LOGV2_DEBUG(21969, 3, "Caught Assertion in {networkOp}, continuing: {error}", "Assertion in fire-and-forget operation", - "networkOp"_attr = networkOpToString(hr->op()), + "networkOp"_attr = networkOpToString(executionContext->op()), "error"_attr = redact(ue)); - hr->currentOp().debug().errInfo = ue.toStatus(); + executionContext->currentOp().debug().errInfo = ue.toStatus(); } // A NotWritablePrimary error can be set either within // receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler above. // Either way, we want to throw an exception here, which will cause the client to be // disconnected. - if (LastError::get(hr->client()).hadNotPrimaryError()) { + if (LastError::get(executionContext->client()).hadNotPrimaryError()) { notPrimaryLegacyUnackWrites.increment(); uasserted(ErrorCodes::NotWritablePrimary, str::stream() << "Not-master error while processing '" - << networkOpToString(hr->op()) << "' operation on '" - << hr->nsString() << "' namespace via legacy " + << networkOpToString(executionContext->op()) << "' operation on '" + << executionContext->nsString() << "' namespace via legacy " << "fire-and-forget command execution."); } return {}; } -void HandleRequest::startOperation() { - if (client().isInDirectClient()) { +Future HandleRequest::startOperation() try { + auto opCtx = executionContext->getOpCtx(); + auto& client = executionContext->client(); + auto& currentOp = executionContext->currentOp(); + + if (client.isInDirectClient()) { if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber()) { invariant(!opCtx->inMultiDocumentTransaction() && !opCtx->lockState()->inAWriteUnitOfWork()); } } else { - LastError::get(client()).startRequest(); - AuthorizationSession::get(client())->startRequest(opCtx); + LastError::get(client).startRequest(); + AuthorizationSession::get(client)->startRequest(opCtx); // We should not be holding any locks at this point invariant(!opCtx->lockState()->isLocked()); } { - stdx::lock_guard lk(client()); + stdx::lock_guard lk(client); // Commands handling code will reset this if the operation is a command // which is logically a basic CRUD operation like query, insert, etc. - currentOp().setNetworkOp_inlock(op()); - currentOp().setLogicalOp_inlock(networkOpToLogicalOp(op())); + currentOp.setNetworkOp_inlock(executionContext->op()); + currentOp.setLogicalOp_inlock(networkOpToLogicalOp(executionContext->op())); } + return {}; +} catch (const DBException& ex) { + return ex.toStatus(); } -DbResponse HandleRequest::run() { - startOperation(); - DbResponse dbresponse; - if (auto opRunner = makeOpRunner()) { - dbresponse = opRunner->run(); - } else { - currentOp().done(); - forceLog = true; - } - completeOperation(dbresponse); - return dbresponse; -} +Future HandleRequest::completeOperation() try { + auto opCtx = executionContext->getOpCtx(); + auto& currentOp = executionContext->currentOp(); -void HandleRequest::completeOperation(const DbResponse& dbresponse) { // Mark the op as complete, and log it if appropriate. Returns a boolean indicating whether // this op should be written to the profiler. - const bool shouldProfile = currentOp().completeAndLogOperation( - opCtx, MONGO_LOGV2_DEFAULT_COMPONENT, dbresponse.response.size(), slowMsOverride, forceLog); + const bool shouldProfile = + currentOp.completeAndLogOperation(opCtx, + MONGO_LOGV2_DEFAULT_COMPONENT, + executionContext->getResponse().response.size(), + executionContext->slowMsOverride, + executionContext->forceLog); Top::get(opCtx->getServiceContext()) .incrementGlobalLatencyStats( opCtx, - durationCount(currentOp().elapsedTimeExcludingPauses()), - currentOp().getReadWriteType()); + durationCount(currentOp.elapsedTimeExcludingPauses()), + currentOp.getReadWriteType()); if (shouldProfile) { // Performance profiling is on if (opCtx->lockState()->isReadLocked()) { LOGV2_DEBUG(21970, 1, "Note: not profiling because of recursive read lock"); - } else if (client().isInDirectClient()) { + } else if (executionContext->client().isInDirectClient()) { LOGV2_DEBUG(21971, 1, "Note: not profiling because we are in DBDirectClient"); - } else if (behaviors.lockedForWriting()) { + } else if (executionContext->behaviors->lockedForWriting()) { // TODO SERVER-26825: Fix race condition where fsyncLock is acquired post // lockedForWriting() call but prior to profile collection lock acquisition. LOGV2_DEBUG(21972, 1, "Note: not profiling because doing fsync+lock"); @@ -1932,11 +2243,14 @@ void HandleRequest::completeOperation(const DbResponse& dbresponse) { LOGV2_DEBUG(21973, 1, "Note: not profiling because server is read-only"); } else { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - profile(opCtx, op()); + profile(opCtx, executionContext->op()); } } recordCurOpMetrics(opCtx); + return {}; +} catch (const DBException& ex) { + return ex.toStatus(); } } // namespace @@ -1950,15 +2264,27 @@ BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* comman return bob.obj(); } -Future ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, - const Message& m, - const Hooks& behaviors) noexcept { - try { - return Future::makeReady(HandleRequest{opCtx, m, behaviors}.run()); - } catch (DBException& e) { - LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(e)); - return e.toStatus(); - } +Future ServiceEntryPointCommon::handleRequest( + OperationContext* opCtx, const Message& m, std::unique_ptr behaviors) noexcept { + auto hr = std::make_shared(opCtx, m, std::move(behaviors)); + return hr->startOperation() + .then([hr]() -> Future { + auto opRunner = hr->makeOpRunner(); + invariant(opRunner); + return opRunner->run().then( + [execContext = hr->executionContext](DbResponse response) -> void { + // Set the response upon successful execution + execContext->setResponse(std::move(response)); + }); + }) + .then([hr] { return hr->completeOperation(); }) + .onCompletion([hr](Status status) -> Future { + if (!status.isOK()) { + LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(status)); + return status; + } + return hr->executionContext->getResponse(); + }); } ServiceEntryPointCommon::Hooks::~Hooks() = default; diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h index a5623e41c88..dbf95b165a2 100644 --- a/src/mongo/db/service_entry_point_common.h +++ b/src/mongo/db/service_entry_point_common.h @@ -84,7 +84,7 @@ struct ServiceEntryPointCommon { virtual void attachCurOpErrInfo(OperationContext* opCtx, const BSONObj& replyObj) const = 0; - virtual void handleException(const DBException& e, OperationContext* opCtx) const = 0; + virtual void handleException(const Status& status, OperationContext* opCtx) const = 0; virtual void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const = 0; @@ -101,7 +101,7 @@ struct ServiceEntryPointCommon { static Future handleRequest(OperationContext* opCtx, const Message& m, - const Hooks& hooks) noexcept; + std::unique_ptr hooks) noexcept; /** * Produce a new object based on cmdObj, but with redactions applied as specified by diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 75b28b6595a..4347ffa8ac4 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -182,9 +182,9 @@ public: CurOp::get(opCtx)->debug().errInfo = getStatusFromCommandResult(replyObj); } - void handleException(const DBException& e, OperationContext* opCtx) const override { + void handleException(const Status& status, OperationContext* opCtx) const override { // If we got a stale config, wait in case the operation is stuck in a critical section - if (auto sce = e.extraInfo()) { + if (auto sce = status.extraInfo()) { // A config server acting as a router may return a StaleConfig exception, but a config // server won't contain data for a sharded collection, so skip handling the exception. if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { @@ -204,7 +204,7 @@ public: onShardVersionMismatchNoExcept(opCtx, sce->getNss(), sce->getVersionReceived()) .ignore(); } - } else if (auto sce = e.extraInfo()) { + } else if (auto sce = status.extraInfo()) { if (!opCtx->getClient()->isInDirectClient()) { onDbVersionMismatchNoExcept( opCtx, sce->getDb(), sce->getVersionReceived(), sce->getVersionWanted()) @@ -271,7 +271,7 @@ public: Future ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) noexcept { - return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{}); + return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique()); } } // namespace mongo diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp index 1dbf1a52c95..158b55a10e6 100644 --- a/src/mongo/embedded/service_entry_point_embedded.cpp +++ b/src/mongo/embedded/service_entry_point_embedded.cpp @@ -93,7 +93,7 @@ public: void attachCurOpErrInfo(OperationContext*, const BSONObj&) const override {} - void handleException(const DBException& e, OperationContext* opCtx) const override {} + void handleException(const Status& status, OperationContext* opCtx) const override {} void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const override {} @@ -117,7 +117,7 @@ Future ServiceEntryPointEmbedded::handleRequest(OperationContext* op // guarantees of the state (that they have run). checked_cast(opCtx->getServiceContext()->getPeriodicRunner()) ->tryPump(); - return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{}); + return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique()); } void ServiceEntryPointEmbedded::startSession(transport::SessionHandle session) { -- cgit v1.2.1