summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-09-17 17:36:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-06 04:33:51 +0000
commita0fce10bb34cb04455f02948352f5eb37b38ad75 (patch)
treebc97ee65b351239040bf7cb1b3ff3c1cabe6ba54 /src/mongo/db
parentca3c18008bff68e2684ad5954452c6513f7a744e (diff)
downloadmongo-a0fce10bb34cb04455f02948352f5eb37b38ad75.tar.gz
SERVER-49107 Add support for async execution to MongoD command path
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands.cpp24
-rw-r--r--src/mongo/db/commands.h50
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.cpp54
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.h43
-rw-r--r--src/mongo/db/request_execution_context.h131
-rw-r--r--src/mongo/db/service_entry_point_common.cpp1888
-rw-r--r--src/mongo/db/service_entry_point_common.h4
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp8
8 files changed, 1381 insertions, 821 deletions
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index a2c7a804538..83fba136be7 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -185,6 +185,20 @@ void CommandHelpers::runCommandInvocation(OperationContext* opCtx,
}
}
+Future<void> CommandHelpers::runCommandInvocationAsync(
+ std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<CommandInvocation> invocation) try {
+ auto hooks = getCommandInvocationHooksHandle(rec->getOpCtx()->getServiceContext());
+ if (hooks)
+ hooks->onBeforeAsyncRun(rec, invocation.get());
+ return invocation->runAsync(rec).then([rec, hooks, invocation] {
+ if (hooks)
+ hooks->onAfterAsyncRun(rec, invocation.get());
+ });
+} catch (const DBException& e) {
+ return e.toStatus();
+}
+
void CommandHelpers::auditLogAuthEvent(OperationContext* opCtx,
const CommandInvocation* invocation,
const OpMsgRequest& request,
@@ -801,6 +815,16 @@ private:
}
}
+ Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec) override {
+ return _command->runAsync(rec, _dbName).onError([rec](Status status) {
+ if (status.code() != ErrorCodes::FailedToRunWithReplyBuilder)
+ return status;
+ BSONObjBuilder bob = rec->getReplyBuilder()->getBodyBuilder();
+ CommandHelpers::appendSimpleCommandStatus(bob, false);
+ return Status::OK();
+ });
+ }
+
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) override {
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index d1cf2b369c8..e1805991cab 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -30,6 +30,7 @@
#pragma once
#include <boost/optional.hpp>
+#include <fmt/format.h>
#include <functional>
#include <string>
#include <vector>
@@ -47,10 +48,12 @@
#include "mongo/db/query/explain.h"
#include "mongo/db/read_concern_support_result.h"
#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/request_execution_context.h"
#include "mongo/db/write_concern.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/reply_builder_interface.h"
#include "mongo/util/fail_point.h"
+#include "mongo/util/future.h"
#include "mongo/util/string_map.h"
namespace mongo {
@@ -92,11 +95,27 @@ public:
CommandInvocation* invocation) = 0;
/**
+ * A behavior to perform before CommandInvocation::asyncRun(). Defaults to `onBeforeRun(...)`.
+ */
+ virtual void onBeforeAsyncRun(std::shared_ptr<RequestExecutionContext> rec,
+ CommandInvocation* invocation) {
+ onBeforeRun(rec->getOpCtx(), rec->getRequest(), invocation);
+ }
+
+ /**
* A behavior to perform after CommandInvocation::run()
*/
virtual void onAfterRun(OperationContext* opCtx,
const OpMsgRequest& request,
CommandInvocation* invocation) = 0;
+
+ /**
+ * A behavior to perform after CommandInvocation::asyncRun(). Defaults to `onAfterRun(...)`.
+ */
+ virtual void onAfterAsyncRun(std::shared_ptr<RequestExecutionContext> rec,
+ CommandInvocation* invocation) {
+ onAfterRun(rec->getOpCtx(), rec->getRequest(), invocation);
+ }
};
// Various helpers unrelated to any single command or to the command registry.
@@ -236,6 +255,15 @@ struct CommandHelpers {
rpc::ReplyBuilderInterface* response);
/**
+ * Runs a previously parsed command and propagates the result to the ReplyBuilderInterface. For
+ * commands that do not offer an implementation tailored for asynchronous execution, the future
+ * schedules the execution of the default implementation, historically designed for synchronous
+ * execution.
+ */
+ static Future<void> runCommandInvocationAsync(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<CommandInvocation> invocation);
+
+ /**
* If '!invocation', we're logging about a Command pre-parse. It has to punt on the logged
* namespace, giving only the request's $db. Since the Command hasn't parsed the request body,
* we can't know the collection part of that namespace, so we leave it blank in the audit log.
@@ -568,6 +596,16 @@ public:
*/
virtual void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) = 0;
+ /**
+ * Returns a future that can schedule asynchronous execution of the command. By default, the
+ * future falls back to the execution of `run(...)`, thus the default semantics of
+ * `runAsync(...)` is identical to that of `run(...).
+ */
+ virtual Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec) {
+ run(rec->getOpCtx(), rec->getReplyBuilder());
+ return Status::OK();
+ }
+
virtual void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) {
@@ -705,6 +743,18 @@ public:
rpc::ReplyBuilderInterface* replyBuilder) = 0;
/**
+ * Provides a future that may run the command asynchronously. By default, it falls back to
+ * runWithReplyBuilder.
+ */
+ virtual Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec, std::string db) {
+ if (!runWithReplyBuilder(
+ rec->getOpCtx(), db, rec->getRequest().body, rec->getReplyBuilder()))
+ return Status(ErrorCodes::FailedToRunWithReplyBuilder,
+ fmt::format("Failed to run command: {}", rec->getCommand()->getName()));
+ return Status::OK();
+ }
+
+ /**
* Commands which can be explained override this method. Any operation which has a query
* part and executes as a tree of execution stages can be explained. A command should
* implement explain by:
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp
index eb0e257d045..fc98af33f70 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp
@@ -43,6 +43,7 @@
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
@@ -222,6 +223,59 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) {
});
}
+class MigrationConflictHandler : public std::enable_shared_from_this<MigrationConflictHandler> {
+public:
+ MigrationConflictHandler(std::shared_ptr<RequestExecutionContext> rec,
+ unique_function<Future<void>()> callable)
+ : _rec(std::move(rec)), _callable(std::move(callable)) {}
+
+ Future<void> run() try {
+ checkIfCanReadOrBlock(_rec->getOpCtx(), _rec->getRequest().getDatabase());
+ // callable will modify replyBuilder.
+ return _callable()
+ .then([this, anchor = shared_from_this()] { _checkReplyForTenantMigrationConflict(); })
+ .onError<ErrorCodes::TenantMigrationConflict>(
+ [this, anchor = shared_from_this()](Status status) {
+ _handleTenantMigrationConflict(std::move(status));
+ });
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+
+private:
+ void _checkReplyForTenantMigrationConflict() {
+ auto replyBodyBuilder = _rec->getReplyBuilder()->getBodyBuilder();
+
+ // getStatusFromWriteCommandReply expects an 'ok' field.
+ CommandHelpers::extractOrAppendOk(replyBodyBuilder);
+
+ // Commands such as insert, update, delete, and applyOps return the result as a status
+ // rather than throwing.
+ const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj());
+
+ // Only throw `TenantMigrationConflict` exceptions.
+ if (status == ErrorCodes::TenantMigrationConflict)
+ internalAssert(status);
+ }
+
+ void _handleTenantMigrationConflict(Status status) {
+ auto migrationConflictInfo = status.extraInfo<TenantMigrationConflictInfo>();
+ invariant(migrationConflictInfo);
+
+ if (auto mtab = migrationConflictInfo->getTenantMigrationAccessBlocker()) {
+ uassertStatusOK(mtab->waitUntilCommittedOrAborted(_rec->getOpCtx()));
+ }
+ }
+
+ const std::shared_ptr<RequestExecutionContext> _rec;
+ const unique_function<Future<void>()> _callable;
+};
+
+Future<void> migrationConflictHandler(std::shared_ptr<RequestExecutionContext> rec,
+ unique_function<Future<void>()> callable) {
+ return std::make_shared<MigrationConflictHandler>(std::move(rec), std::move(callable))->run();
+}
+
} // namespace tenant_migration_donor
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h
index 71b522b9657..ead99e97bd2 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.h
+++ b/src/mongo/db/repl/tenant_migration_donor_util.h
@@ -35,9 +35,12 @@
#include "mongo/db/repl/tenant_migration_access_blocker_registry.h"
#include "mongo/db/repl/tenant_migration_conflict_info.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/request_execution_context.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/reply_builder_interface.h"
+#include "mongo/util/functional.h"
+#include "mongo/util/future.h"
namespace mongo {
@@ -81,41 +84,13 @@ void onWriteToDatabase(OperationContext* opCtx, StringData dbName);
void recoverTenantMigrationAccessBlockers(OperationContext* opCtx);
/**
- * Runs the argument function 'callable'. If it throws a TenantMigrationConflict error (as indicated
- * in 'replyBuilder'), clears 'replyBuilder' and blocks until the migration commits or aborts, then
- * throws TenantMigrationCommitted or TenantMigrationAborted.
+ * Returns a future that asynchronously schedules and runs the argument function 'callable'. If it
+ * throws a TenantMigrationConflict error (as indicated in 'replyBuilder'), clears 'replyBuilder'
+ * and blocks until the migration commits or aborts, then returns TenantMigrationCommitted or
+ * TenantMigrationAborted.
*/
-template <typename Callable>
-void migrationConflictHandler(OperationContext* opCtx,
- StringData dbName,
- Callable&& callable,
- rpc::ReplyBuilderInterface* replyBuilder) {
- checkIfCanReadOrBlock(opCtx, dbName);
-
- try {
- // callable will modify replyBuilder.
- callable();
- auto replyBodyBuilder = replyBuilder->getBodyBuilder();
-
- // getStatusFromWriteCommandReply expects an 'ok' field.
- CommandHelpers::extractOrAppendOk(replyBodyBuilder);
-
- // applyOps returns the result as a status rather than throwing.
- const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj());
-
- if (status == ErrorCodes::TenantMigrationConflict) {
- uassertStatusOK(status);
- }
- return;
- } catch (const TenantMigrationConflictException& ex) {
- auto migrationConflictInfo = ex.extraInfo<TenantMigrationConflictInfo>();
- invariant(migrationConflictInfo);
-
- if (auto mtab = migrationConflictInfo->getTenantMigrationAccessBlocker()) {
- uassertStatusOK(mtab->waitUntilCommittedOrAborted(opCtx));
- }
- }
-}
+Future<void> migrationConflictHandler(std::shared_ptr<RequestExecutionContext> rec,
+ unique_function<Future<void>()> callable);
} // namespace tenant_migration_donor
diff --git a/src/mongo/db/request_execution_context.h b/src/mongo/db/request_execution_context.h
new file mode 100644
index 00000000000..b34c7e4f3b7
--- /dev/null
+++ b/src/mongo/db/request_execution_context.h
@@ -0,0 +1,131 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/db/dbmessage.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/rpc/message.h"
+#include "mongo/rpc/op_msg.h"
+#include "mongo/rpc/reply_builder_interface.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+class Command;
+
+/**
+ * Captures the execution context for a command to allow safe, shared and asynchronous accesses.
+ * This class owns all objects that participate in command execution (e.g., `request`). The only
+ * exceptions are `opCtx` and `command`. The `opCtx` remains valid so long as its corresponding
+ * client is attached to the executor thread. In case of `command`, it is a global, static
+ * construct and is safe to be accessed through raw pointers.
+ * Any access from a client thread that does not own the `opCtx`, or after the `opCtx` is
+ * released is strictly forbidden.
+ */
+class RequestExecutionContext {
+public:
+ RequestExecutionContext() = delete;
+ RequestExecutionContext(const RequestExecutionContext&) = delete;
+ RequestExecutionContext(RequestExecutionContext&&) = delete;
+
+ explicit RequestExecutionContext(OperationContext* opCtx) : _opCtx(opCtx) {}
+
+ auto getOpCtx() const {
+ invariant(_isOnClientThread());
+ return _opCtx;
+ }
+
+ void setMessage(Message message) {
+ invariant(_isOnClientThread() && !_message);
+ _message = std::move(message);
+ _dbmsg = std::make_unique<DbMessage>(_message.get());
+ }
+ const Message& getMessage() const {
+ invariant(_isOnClientThread() && _message);
+ return _message.get();
+ }
+ DbMessage& getDbMessage() const {
+ invariant(_isOnClientThread() && _dbmsg);
+ return *_dbmsg.get();
+ }
+
+ void setRequest(OpMsgRequest request) {
+ invariant(_isOnClientThread() && !_request);
+ _request = std::move(request);
+ }
+ const OpMsgRequest& getRequest() const {
+ invariant(_isOnClientThread() && _request);
+ return _request.get();
+ }
+
+ void setCommand(Command* command) {
+ invariant(_isOnClientThread() && !_command);
+ _command = command;
+ }
+ Command* getCommand() const {
+ invariant(_isOnClientThread());
+ return _command;
+ }
+
+ void setReplyBuilder(std::unique_ptr<rpc::ReplyBuilderInterface> replyBuilder) {
+ invariant(_isOnClientThread() && !_replyBuilder);
+ _replyBuilder = std::move(replyBuilder);
+ }
+ auto getReplyBuilder() const {
+ invariant(_isOnClientThread() && _replyBuilder);
+ return _replyBuilder.get();
+ }
+
+ void setResponse(DbResponse response) {
+ invariant(_isOnClientThread());
+ _response = std::move(response);
+ }
+ DbResponse& getResponse() {
+ invariant(_isOnClientThread());
+ return _response;
+ }
+
+private:
+ bool _isOnClientThread() const {
+ return _opCtx != nullptr && Client::getCurrent() == _opCtx->getClient();
+ }
+
+ OperationContext* const _opCtx;
+ boost::optional<Message> _message;
+ std::unique_ptr<DbMessage> _dbmsg;
+ boost::optional<OpMsgRequest> _request;
+ Command* _command = nullptr;
+ std::unique_ptr<rpc::ReplyBuilderInterface> _replyBuilder;
+ DbResponse _response;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 5a4e9fddef6..b70b330acb9 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -72,6 +72,7 @@
#include "mongo/db/repl/speculative_majority_read_info.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/tenant_migration_donor_util.h"
+#include "mongo/db/request_execution_context.h"
#include "mongo/db/run_op_kill_cursors.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
@@ -129,58 +130,77 @@ namespace {
using namespace fmt::literals;
-/** Allows for the very complex handleRequest function to be decomposed into parts. */
+/*
+ * Allows for the very complex handleRequest function to be decomposed into parts.
+ * It also provides the infrastructure to futurize the process of executing commands.
+ */
struct HandleRequest {
- struct OpRunner {
- explicit OpRunner(HandleRequest* hr) : hr{hr} {}
- virtual ~OpRunner() = default;
- virtual DbResponse run() = 0;
- HandleRequest* hr;
- };
+ // Maintains the context (e.g., opCtx and replyBuilder) required for command execution.
+ class ExecutionContext final : public RequestExecutionContext {
+ public:
+ ExecutionContext(OperationContext* opCtx,
+ Message msg,
+ std::unique_ptr<const ServiceEntryPointCommon::Hooks> hooks)
+ : RequestExecutionContext(opCtx), behaviors(std::move(hooks)) {
+ // It also initializes dbMessage, which is accessible via getDbMessage()
+ setMessage(std::move(msg));
+ }
+ ~ExecutionContext() = default;
- HandleRequest(OperationContext* opCtx,
- const Message& m,
- const ServiceEntryPointCommon::Hooks& behaviors)
- : opCtx{opCtx}, m{m}, behaviors{behaviors}, dbmsg{m} {}
+ Client& client() const {
+ return *getOpCtx()->getClient();
+ }
- DbResponse run();
+ auto session() const {
+ return client().session();
+ }
- NetworkOp op() const {
- return m.operation();
- }
+ NetworkOp op() const {
+ return getMessage().operation();
+ }
- CurOp& currentOp() {
- return *CurOp::get(opCtx);
- }
+ CurOp& currentOp() {
+ return *CurOp::get(getOpCtx());
+ }
- NamespaceString nsString() const {
- if (!dbmsg.messageShouldHaveNs())
- return {};
- return NamespaceString(dbmsg.getns());
- }
+ NamespaceString nsString() const {
+ auto& dbmsg = getDbMessage();
+ if (!dbmsg.messageShouldHaveNs())
+ return {};
+ return NamespaceString(dbmsg.getns());
+ }
- void assertValidNsString() {
- if (!nsString().isValid()) {
- uassert(16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false);
+ void assertValidNsString() {
+ if (!nsString().isValid()) {
+ uassert(
+ 16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false);
+ }
}
- }
- std::unique_ptr<OpRunner> makeOpRunner();
- void startOperation();
- void completeOperation(const DbResponse& dbresponse);
+ std::unique_ptr<const ServiceEntryPointCommon::Hooks> behaviors;
+ boost::optional<long long> slowMsOverride;
+ bool forceLog = false;
+ };
- Client& client() const {
- return *opCtx->getClient();
- }
+ struct OpRunner {
+ explicit OpRunner(HandleRequest* hr) : executionContext{hr->executionContext} {}
+ virtual ~OpRunner() = default;
+ virtual Future<DbResponse> run() = 0;
+ std::shared_ptr<ExecutionContext> executionContext;
+ };
- OperationContext* opCtx;
- const Message& m;
- const ServiceEntryPointCommon::Hooks& behaviors;
+ HandleRequest(OperationContext* opCtx,
+ const Message& msg,
+ std::unique_ptr<const ServiceEntryPointCommon::Hooks> behaviors)
+ : executionContext(std::make_shared<ExecutionContext>(
+ opCtx, const_cast<Message&>(msg), std::move(behaviors))) {}
+
+ std::unique_ptr<OpRunner> makeOpRunner();
- DbMessage dbmsg;
+ Future<void> startOperation();
+ Future<void> completeOperation();
- boost::optional<long long> slowMsOverride;
- bool forceLog = false;
+ std::shared_ptr<ExecutionContext> executionContext;
};
void generateLegacyQueryErrorResponse(const AssertionException& exception,
@@ -253,22 +273,23 @@ void generateLegacyQueryErrorResponse(const AssertionException& exception,
response->setData(bb.release());
}
-void registerError(OperationContext* opCtx, const DBException& exception) {
- LastError::get(opCtx->getClient()).setLastError(exception.code(), exception.reason());
- CurOp::get(opCtx)->debug().errInfo = exception.toStatus();
+void registerError(OperationContext* opCtx, const Status& status) {
+ LastError::get(opCtx->getClient()).setLastError(status.code(), status.reason());
+ CurOp::get(opCtx)->debug().errInfo = status;
}
void generateErrorResponse(OperationContext* opCtx,
rpc::ReplyBuilderInterface* replyBuilder,
- const DBException& exception,
+ const Status& status,
const BSONObj& replyMetadata,
BSONObj extraFields = {}) {
- registerError(opCtx, exception);
+ invariant(!status.isOK());
+ registerError(opCtx, status);
// We could have thrown an exception after setting fields in the builder,
// so we need to reset it to a clean state just to be sure.
replyBuilder->reset();
- replyBuilder->setCommandReply(exception.toStatus(), extraFields);
+ replyBuilder->setCommandReply(status, extraFields);
replyBuilder->getBodyBuilder().appendElements(replyMetadata);
}
@@ -548,14 +569,248 @@ void appendErrorLabelsAndTopologyVersion(OperationContext* opCtx,
topologyVersion.serialize(&topologyVersionBuilder);
}
-void _abortUnpreparedOrStashPreparedTransaction(
- OperationContext* opCtx, TransactionParticipant::Participant* txnParticipant) {
- const bool isPrepared = txnParticipant->transactionIsPrepared();
+class ExecCommandDatabase : public std::enable_shared_from_this<ExecCommandDatabase> {
+public:
+ explicit ExecCommandDatabase(std::shared_ptr<HandleRequest::ExecutionContext> execContext)
+ : _execContext(std::move(execContext)) {}
+
+ static Future<void> run(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
+ return std::make_shared<ExecCommandDatabase>(std::move(execContext))->_makeFutureChain();
+ }
+
+ std::shared_ptr<HandleRequest::ExecutionContext> getExecutionContext() {
+ return _execContext;
+ }
+ std::shared_ptr<CommandInvocation> getInvocation() {
+ return _invocation;
+ }
+ const OperationSessionInfoFromClient& getSessionOptions() const {
+ return _sessionOptions;
+ }
+ BSONObjBuilder* getExtraFieldsBuilder() {
+ return &_extraFieldsBuilder;
+ }
+ const LogicalTime& getStartOperationTime() const {
+ return _startOperationTime;
+ }
+
+ bool isHello() const {
+ return _execContext->getCommand()->getName() == "hello"_sd ||
+ _execContext->getCommand()->getName() == "isMaster"_sd;
+ }
+
+private:
+ // Returns a future that executes a command after stripping metadata, performing authorization
+ // checks, handling audit impersonation, and (potentially) setting maintenance mode. The future
+ // also checks that the command is permissible to run on the node given its current replication
+ // state. All the logic here is independent of any particular command; any functionality
+ // relevant to a specific command should be confined to its run() method.
+ Future<void> _makeFutureChain() {
+ return _parseCommand().then([this, anchor = shared_from_this()] {
+ return _initiateCommand()
+ .then([this] { return _commandExec(); })
+ .onError([this, anchor = shared_from_this()](Status status) {
+ return _handleFailure(std::move(status));
+ });
+ });
+ }
+
+ Future<void> _parseCommand() {
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future).then([this, anchor = shared_from_this()] {
+ auto opCtx = _execContext->getOpCtx();
+ auto command = _execContext->getCommand();
+ auto& request = _execContext->getRequest();
+
+ CommandHelpers::uassertShouldAttemptParse(opCtx, command, request);
+ _startOperationTime = getClientOperationTime(opCtx);
+
+ _invocation = command->parse(opCtx, request);
+ CommandInvocation::set(opCtx, _invocation);
+
+ const auto session = _execContext->getOpCtx()->getClient()->session();
+ if (session) {
+ if (!opCtx->isExhaust() || !isHello()) {
+ InExhaustHello::get(session.get())
+ ->setInExhaust(false, request.getCommandName());
+ }
+ }
+
+ // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that.
+ if (isHello()) {
+ _execContext->slowMsOverride =
+ 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime);
+ }
+ });
+ pf.promise.emplaceValue();
+ return future;
+ }
+
+ // Any logic, such as authorization and auditing, that must precede execution of the command.
+ Future<void> _initiateCommand();
+
+ // Returns the future chain that executes the parsed command against the database.
+ Future<void> _commandExec();
+
+ // Any error-handling logic that must be performed if the command initiation/execution fails.
+ void _handleFailure(Status status);
+
+ bool _isInternalClient() const {
+ return _execContext->session() &&
+ _execContext->session()->getTags() & transport::Session::kInternalClient;
+ }
+
+ const std::shared_ptr<HandleRequest::ExecutionContext> _execContext;
+
+ // The following allows `_initiateCommand`, `_commandExec`, and `_handleFailure` to share
+ // execution state without concerning the lifetime of these variables.
+ BSONObjBuilder _extraFieldsBuilder;
+ std::shared_ptr<CommandInvocation> _invocation;
+ LogicalTime _startOperationTime;
+ OperationSessionInfoFromClient _sessionOptions;
+ std::unique_ptr<PolymorphicScoped> _scoped;
+};
+
+class RunCommandImpl : public std::enable_shared_from_this<RunCommandImpl> {
+public:
+ explicit RunCommandImpl(std::shared_ptr<ExecCommandDatabase> ecd)
+ : _ecd(std::move(ecd)),
+ _shouldCheckOutSession(
+ _ecd->getSessionOptions().getTxnNumber() &&
+ !shouldCommandSkipSessionCheckout(_ecd->getInvocation()->definition()->getName())),
+ _shouldWaitForWriteConcern(_ecd->getInvocation()->supportsWriteConcern() ||
+ _ecd->getInvocation()->definition()->getLogicalOp() ==
+ LogicalOp::opGetMore) {}
+
+ static Future<void> run(std::shared_ptr<ExecCommandDatabase> ecd) {
+ return std::make_shared<RunCommandImpl>(std::move(ecd))->_makeFutureChain();
+ }
+
+private:
+ Future<void> _makeFutureChain();
+
+ // Anchor for references to attributes defined in `ExecCommandDatabase` (e.g., sessionOptions).
+ const std::shared_ptr<ExecCommandDatabase> _ecd;
+
+ // Any code that must run before command execution (e.g., reserving bytes for reply builder).
+ Future<void> _prologue();
+
+ // Runs the command without waiting for write concern
+ Future<void> _runCommand();
+
+ class RunCommandAndWaitForWriteConcern {
+ public:
+ explicit RunCommandAndWaitForWriteConcern(std::shared_ptr<RunCommandImpl> rci)
+ : _rci(std::move(rci)),
+ _execContext(_rci->_ecd->getExecutionContext()),
+ _oldWriteConcern(_execContext->getOpCtx()->getWriteConcern()) {}
+
+ ~RunCommandAndWaitForWriteConcern() {
+ _execContext->getOpCtx()->setWriteConcern(_oldWriteConcern);
+ }
+
+ static Future<void> run(std::shared_ptr<RunCommandImpl>);
+
+ private:
+ void _waitForWriteConcern(BSONObjBuilder& bb);
+
+ void _setup();
+ Future<void> _run();
+ Future<void> _onRunCompletion(Status);
+
+ const std::shared_ptr<RunCommandImpl> _rci;
+ const std::shared_ptr<HandleRequest::ExecutionContext> _execContext;
+
+ // Allows changing the write concern while running the command and resetting on destruction.
+ const WriteConcernOptions _oldWriteConcern;
+ boost::optional<repl::OpTime> _lastOpBeforeRun;
+ boost::optional<WriteConcernOptions> _extractedWriteConcern;
+ };
+
+ // Any code that must run after command execution -- returns true on successful execution.
+ Future<bool> _epilogue();
+
+ bool _isInternalClient() const {
+ auto session = _ecd->getExecutionContext()->session();
+ return session && session->getTags() & transport::Session::kInternalClient;
+ }
+
+ // Whether invoking the command requires a session to be checked out.
+ const bool _shouldCheckOutSession;
+
+ // getMore operations inherit a WriteConcern from their originating cursor. For example, if the
+ // originating command was an aggregate with a $out and batchSize: 0. Note that if the command
+ // only performed reads then we will not need to wait at all.
+ const bool _shouldWaitForWriteConcern;
+};
+
+// Simplifies the interface for invoking commands and allows asynchronous execution of command
+// invocations.
+class InvokeCommand : public std::enable_shared_from_this<InvokeCommand> {
+public:
+ explicit InvokeCommand(std::shared_ptr<ExecCommandDatabase> ecd) : _ecd(std::move(ecd)) {}
+
+ Future<void> run(const bool checkoutSession);
+
+private:
+ class SessionCheckoutPath;
+
+ Future<void> _runInvocation() noexcept;
+
+ const std::shared_ptr<ExecCommandDatabase> _ecd;
+};
+
+class InvokeCommand::SessionCheckoutPath
+ : public std::enable_shared_from_this<InvokeCommand::SessionCheckoutPath> {
+public:
+ SessionCheckoutPath(std::shared_ptr<InvokeCommand> parent) : _parent(std::move(parent)) {}
+
+ Future<void> run();
+
+private:
+ void _cleanupIncompleteTxn();
+
+ Future<void> _checkOutSession();
+ void _tapError(Status);
+ Future<void> _commitInvocation();
+
+ const std::shared_ptr<InvokeCommand> _parent;
+
+ std::unique_ptr<MongoDOperationContextSession> _sessionTxnState;
+ boost::optional<TransactionParticipant::Participant> _txnParticipant;
+ boost::optional<ScopeGuard<std::function<void()>>> _guard;
+};
+
+Future<void> InvokeCommand::run(const bool checkoutSession) {
+ auto [past, present] = makePromiseFuture<void>();
+ auto future = std::move(present).then([this, checkoutSession, anchor = shared_from_this()] {
+ if (checkoutSession)
+ return std::make_shared<SessionCheckoutPath>(std::move(anchor))->run();
+ return _runInvocation();
+ });
+ past.emplaceValue();
+ return future;
+}
+
+Future<void> InvokeCommand::SessionCheckoutPath::run() {
+ auto anchor = shared_from_this();
+ return makeReadyFutureWith([] {})
+ .then([this, anchor] { return _checkOutSession(); })
+ .then([this, anchor] {
+ return _parent->_runInvocation().tapError(
+ [this, anchor](Status status) { return _tapError(std::move(status)); });
+ })
+ .then([this, anchor] { return _commitInvocation(); });
+}
+
+void InvokeCommand::SessionCheckoutPath::_cleanupIncompleteTxn() {
+ auto opCtx = _parent->_ecd->getExecutionContext()->getOpCtx();
+ const bool isPrepared = _txnParticipant->transactionIsPrepared();
try {
if (isPrepared)
- txnParticipant->stashTransactionResources(opCtx);
- else if (txnParticipant->transactionIsOpen())
- txnParticipant->abortTransaction(opCtx);
+ _txnParticipant->stashTransactionResources(opCtx);
+ else if (_txnParticipant->transactionIsOpen())
+ _txnParticipant->abortTransaction(opCtx);
} catch (...) {
// It is illegal for this to throw so we catch and log this here for diagnosability.
LOGV2_FATAL_CONTINUE(21974,
@@ -570,27 +825,18 @@ void _abortUnpreparedOrStashPreparedTransaction(
}
}
-void invokeWithNoSession(OperationContext* opCtx,
- const OpMsgRequest& request,
- CommandInvocation* invocation,
- rpc::ReplyBuilderInterface* replyBuilder) {
- tenant_migration_donor::migrationConflictHandler(
- opCtx,
- request.getDatabase(),
- [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
- replyBuilder);
-}
+Future<void> InvokeCommand::SessionCheckoutPath::_checkOutSession() {
+ auto ecd = _parent->_ecd;
+ auto execContext = ecd->getExecutionContext();
+ auto opCtx = execContext->getOpCtx();
+ CommandInvocation* invocation = ecd->getInvocation().get();
+ const OperationSessionInfoFromClient& sessionOptions = ecd->getSessionOptions();
-void invokeWithSessionCheckedOut(OperationContext* opCtx,
- const OpMsgRequest& request,
- CommandInvocation* invocation,
- const OperationSessionInfoFromClient& sessionOptions,
- rpc::ReplyBuilderInterface* replyBuilder) {
// This constructor will check out the session. It handles the appropriate state management
// for both multi-statement transactions and retryable writes. Currently, only requests with
// a transaction number will check out the session.
- MongoDOperationContextSession sessionTxnState(opCtx);
- auto txnParticipant = TransactionParticipant::get(opCtx);
+ _sessionTxnState = std::make_unique<MongoDOperationContextSession>(opCtx);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx));
if (!opCtx->getClient()->isInDirectClient()) {
bool beganOrContinuedTxn{false};
@@ -598,13 +844,13 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
// transaction on that session.
while (!beganOrContinuedTxn) {
try {
- txnParticipant.beginOrContinue(opCtx,
- *sessionOptions.getTxnNumber(),
- sessionOptions.getAutocommit(),
- sessionOptions.getStartTransaction());
+ _txnParticipant->beginOrContinue(opCtx,
+ *sessionOptions.getTxnNumber(),
+ sessionOptions.getAutocommit(),
+ sessionOptions.getStartTransaction());
beganOrContinuedTxn = true;
} catch (const ExceptionFor<ErrorCodes::PreparedTransactionInProgress>&) {
- auto prepareCompleted = txnParticipant.onExitPrepare();
+ auto prepareCompleted = _txnParticipant->onExitPrepare();
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&waitAfterNewStatementBlocksBehindPrepare,
@@ -636,21 +882,19 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
// transactions on failure to unstash the transaction resources to opCtx. We don't want to
// have this error guard for beginOrContinue as it can abort the transaction for any
// accidental invalid statements in the transaction.
- auto abortOnError = makeGuard([&txnParticipant, opCtx] {
- if (txnParticipant.transactionIsInProgress()) {
- txnParticipant.abortTransaction(opCtx);
+ auto abortOnError = makeGuard([&] {
+ if (_txnParticipant->transactionIsInProgress()) {
+ _txnParticipant->abortTransaction(opCtx);
}
});
- txnParticipant.unstashTransactionResources(opCtx, invocation->definition()->getName());
+ _txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName());
// Unstash success.
abortOnError.dismiss();
}
- auto guard = makeGuard([opCtx, &txnParticipant] {
- _abortUnpreparedOrStashPreparedTransaction(opCtx, &txnParticipant);
- });
+ _guard.emplace([this] { _cleanupIncompleteTxn(); });
if (!opCtx->getClient()->isInDirectClient()) {
const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
@@ -675,23 +919,31 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
(cmdName == "create"_sd || cmdName == "createIndexes"_sd)) {
if (!readConcernSupport.readConcernSupport.isOK()) {
uassertStatusOK(readConcernSupport.readConcernSupport.withContext(
- str::stream() << "Command " << cmdName
- << " does not support this transaction's "
- << readConcernArgs.toString()));
+ "Command {} does not support this transaction's {}"_format(
+ cmdName, readConcernArgs.toString())));
}
}
}
// Use the API parameters that were stored when the transaction was initiated.
- APIParameters::get(opCtx) = txnParticipant.getAPIParameters(opCtx);
+ APIParameters::get(opCtx) = _txnParticipant->getAPIParameters(opCtx);
- try {
- tenant_migration_donor::migrationConflictHandler(
- opCtx,
- request.getDatabase(),
- [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
- replyBuilder);
- } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>&) {
+ return Status::OK();
+}
+
+Future<void> InvokeCommand::_runInvocation() noexcept {
+ auto execContext = _ecd->getExecutionContext();
+ return tenant_migration_donor::migrationConflictHandler(
+ execContext, [execContext, invocation = _ecd->getInvocation()] {
+ return CommandHelpers::runCommandInvocationAsync(std::move(execContext),
+ std::move(invocation));
+ });
+}
+
+void InvokeCommand::SessionCheckoutPath::_tapError(Status status) {
+ auto opCtx = _parent->_ecd->getExecutionContext()->getOpCtx();
+ const OperationSessionInfoFromClient& sessionOptions = _parent->_ecd->getSessionOptions();
+ if (status.code() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
// Exceptions are used to resolve views in a sharded cluster, so they should be handled
// specially to avoid unnecessary aborts.
@@ -704,49 +956,47 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
// avoid leaving it orphaned in this case, which is fine even if it is re-targeted
// because the retry will include "startTransaction" again and "restart" a transaction
// at the active txnNumber.
- throw;
+ return;
}
// If this shard has completed an earlier statement for this transaction, it must already be
// in the transaction's participant list, so it is guaranteed to learn its outcome.
- txnParticipant.stashTransactionResources(opCtx);
- guard.dismiss();
- throw;
- } catch (const ExceptionFor<ErrorCodes::WouldChangeOwningShard>&) {
- txnParticipant.stashTransactionResources(opCtx);
- txnParticipant.resetRetryableWriteState(opCtx);
- guard.dismiss();
- throw;
+ _txnParticipant->stashTransactionResources(opCtx);
+ _guard->dismiss();
+ } else if (status.code() == ErrorCodes::WouldChangeOwningShard) {
+ _txnParticipant->stashTransactionResources(opCtx);
+ _txnParticipant->resetRetryableWriteState(opCtx);
+ _guard->dismiss();
}
+}
+Future<void> InvokeCommand::SessionCheckoutPath::_commitInvocation() {
+ auto execContext = _parent->_ecd->getExecutionContext();
+ auto replyBuilder = execContext->getReplyBuilder();
if (auto okField = replyBuilder->getBodyBuilder().asTempObj()["ok"]) {
// If ok is present, use its truthiness.
if (!okField.trueValue()) {
- return;
+ return Status::OK();
}
}
// Stash or commit the transaction when the command succeeds.
- txnParticipant.stashTransactionResources(opCtx);
- guard.dismiss();
+ _txnParticipant->stashTransactionResources(execContext->getOpCtx());
+ _guard->dismiss();
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer ||
serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- auto txnResponseMetadata = txnParticipant.getResponseMetadata();
+ auto txnResponseMetadata = _txnParticipant->getResponseMetadata();
auto bodyBuilder = replyBuilder->getBodyBuilder();
txnResponseMetadata.serialize(&bodyBuilder);
}
+ return Status::OK();
}
-bool runCommandImpl(OperationContext* opCtx,
- CommandInvocation* invocation,
- const OpMsgRequest& request,
- rpc::ReplyBuilderInterface* replyBuilder,
- LogicalTime startOperationTime,
- const ServiceEntryPointCommon::Hooks& behaviors,
- BSONObjBuilder* extraFieldsBuilder,
- const OperationSessionInfoFromClient& sessionOptions) {
- const Command* command = invocation->definition();
+Future<void> RunCommandImpl::_prologue() try {
+ auto execContext = _ecd->getExecutionContext();
+ auto opCtx = execContext->getOpCtx();
+ const Command* command = _ecd->getInvocation()->definition();
auto bytesToReserve = command->reserveBytesForReply();
// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the
// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency
@@ -755,19 +1005,7 @@ bool runCommandImpl(OperationContext* opCtx,
if (kDebugBuild)
bytesToReserve = 0;
#endif
- replyBuilder->reserveBytes(bytesToReserve);
-
- const auto isInternalClient = opCtx->getClient()->session() &&
- (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient);
-
- const bool shouldCheckOutSession =
- sessionOptions.getTxnNumber() && !shouldCommandSkipSessionCheckout(command->getName());
-
- // getMore operations inherit a WriteConcern from their originating cursor. For example, if the
- // originating command was an aggregate with a $out and batchSize: 0. Note that if the command
- // only performed reads then we will not need to wait at all.
- const bool shouldWaitForWriteConcern =
- invocation->supportsWriteConcern() || command->getLogicalOp() == LogicalOp::opGetMore;
+ execContext->getReplyBuilder()->reserveBytes(bytesToReserve);
// Record readConcern usages for commands run outside of transactions, excluding DBDirectClient.
// For commands inside a transaction, they inherit the readConcern from the transaction. So we
@@ -777,139 +1015,18 @@ bool runCommandImpl(OperationContext* opCtx,
ServerReadConcernMetrics::get(opCtx)->recordReadConcern(repl::ReadConcernArgs::get(opCtx),
false /* isTransaction */);
}
+ return Status::OK();
+} catch (const DBException& ex) {
+ return ex.toStatus();
+}
- if (shouldWaitForWriteConcern) {
- auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
-
- // Change the write concern while running the command.
- const auto oldWC = opCtx->getWriteConcern();
- ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });
-
- boost::optional<WriteConcernOptions> extractedWriteConcern;
- if (command->getLogicalOp() == LogicalOp::opGetMore) {
- // WriteConcern will be set up during command processing, it must not be specified on
- // the command body.
- behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
- } else {
- // WriteConcern should always be explicitly specified by operations received on shard
- // and config servers, even if it is empty (ie. writeConcern: {}). In this context
- // (shard/config servers) an empty WC indicates the operation should use the implicit
- // server defaults. So, warn if the operation has not specified writeConcern and is on
- // a shard/config server.
- if (!opCtx->getClient()->isInDirectClient() &&
- (!opCtx->inMultiDocumentTransaction() ||
- isTransactionCommand(command->getName()))) {
- if (isInternalClient) {
- // WriteConcern should always be explicitly specified by operations received
- // from internal clients (ie. from a mongos or mongod), even if it is empty
- // (ie. writeConcern: {}, which is equivalent to { w: 1, wtimeout: 0 }).
- uassert(
- 4569201,
- "received command without explicit writeConcern on an internalClient connection {}"_format(
- redact(request.body.toString())),
- request.body.hasField(WriteConcernOptions::kWriteConcernField));
- } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer ||
- serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- if (!request.body.hasField(WriteConcernOptions::kWriteConcernField)) {
- // TODO: Disabled until after SERVER-44539, to avoid log spam.
- // LOGV2(21959, "Missing writeConcern on {command}", "Missing "
- // "writeConcern on command", "command"_attr = command->getName());
- }
- }
- }
- extractedWriteConcern.emplace(
- uassertStatusOK(extractWriteConcern(opCtx, request.body, isInternalClient)));
- if (sessionOptions.getAutocommit()) {
- validateWriteConcernForTransaction(*extractedWriteConcern,
- invocation->definition()->getName());
- }
-
- // Ensure that the WC being set on the opCtx has provenance.
- invariant(extractedWriteConcern->getProvenance().hasSource(),
- str::stream() << "unexpected unset provenance on writeConcern: "
- << extractedWriteConcern->toBSON());
-
- opCtx->setWriteConcern(*extractedWriteConcern);
- }
-
- auto waitForWriteConcern = [&](auto&& bb) {
- bool reallyWait = true;
- failCommand.executeIf(
- [&](const BSONObj& data) {
- bb.append(data["writeConcernError"]);
- reallyWait = false;
- if (data.hasField(kErrorLabelsFieldName) &&
- data[kErrorLabelsFieldName].type() == Array) {
- // Propagate error labels specified in the failCommand failpoint to the
- // OperationContext decoration to override getErrorLabels() behaviors.
- invariant(!errorLabelsOverride(opCtx));
- errorLabelsOverride(opCtx).emplace(
- data.getObjectField(kErrorLabelsFieldName).getOwned());
- }
- },
- [&](const BSONObj& data) {
- return CommandHelpers::shouldActivateFailCommandFailPoint(
- data, invocation, opCtx->getClient()) &&
- data.hasField("writeConcernError");
- });
- if (reallyWait) {
- CurOp::get(opCtx)->debug().writeConcern.emplace(opCtx->getWriteConcern());
- behaviors.waitForWriteConcern(opCtx, invocation, lastOpBeforeRun, bb);
- }
- };
-
- try {
- if (auto scoped = failWithErrorCodeInRunCommand.scoped();
- MONGO_unlikely(scoped.isActive())) {
- const auto errorCode = scoped.getData()["errorCode"].numberInt();
- LOGV2(21960,
- "failWithErrorCodeInRunCommand enabled - failing command with error "
- "code: {errorCode}",
- "failWithErrorCodeInRunCommand enabled, failing command",
- "errorCode"_attr = errorCode);
- BSONObjBuilder errorBuilder;
- errorBuilder.append("ok", 0.0);
- errorBuilder.append("code", errorCode);
- errorBuilder.append("errmsg", "failWithErrorCodeInRunCommand enabled.");
- replyBuilder->setCommandReply(errorBuilder.obj());
- } else if (shouldCheckOutSession) {
- invokeWithSessionCheckedOut(
- opCtx, request, invocation, sessionOptions, replyBuilder);
- } else {
- invokeWithNoSession(opCtx, request, invocation, replyBuilder);
- }
- } catch (const DBException& ex) {
- // Do no-op write before returning NoSuchTransaction if command has writeConcern.
- if (ex.toStatus().code() == ErrorCodes::NoSuchTransaction &&
- !opCtx->getWriteConcern().usedDefault) {
- TransactionParticipant::performNoopWrite(opCtx, "NoSuchTransaction error");
- }
- waitForWriteConcern(*extraFieldsBuilder);
- throw;
- }
-
- waitForWriteConcern(replyBuilder->getBodyBuilder());
-
- // With the exception of getMores inheriting the WriteConcern from the originating command,
- // nothing in run() should change the writeConcern.
- if (command->getLogicalOp() == LogicalOp::opGetMore) {
- dassert(!extractedWriteConcern,
- "opGetMore contained unexpected extracted write concern");
- } else {
- dassert(extractedWriteConcern, "no extracted write concern");
- dassert(opCtx->getWriteConcern() == extractedWriteConcern,
- "opCtx wc: {} extracted wc: {}"_format(
- opCtx->getWriteConcern().toBSON().jsonString(),
- extractedWriteConcern->toBSON().jsonString()));
- }
- } else {
- behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
- if (shouldCheckOutSession) {
- invokeWithSessionCheckedOut(opCtx, request, invocation, sessionOptions, replyBuilder);
- } else {
- invokeWithNoSession(opCtx, request, invocation, replyBuilder);
- }
- }
+Future<bool> RunCommandImpl::_epilogue() {
+ auto execContext = _ecd->getExecutionContext();
+ auto opCtx = execContext->getOpCtx();
+ auto& request = execContext->getRequest();
+ auto command = execContext->getCommand();
+ auto replyBuilder = execContext->getReplyBuilder();
+ auto& behaviors = *execContext->behaviors;
// This fail point blocks all commands which are running on the specified namespace, or which
// are present in the given list of commands.If no namespace or command list are provided,then
@@ -926,7 +1043,7 @@ bool runCommandImpl(OperationContext* opCtx,
// If 'ns' or 'commands' is not set, block for all the namespaces or commands
// respectively.
- return (ns.empty() || invocation->ns().ns() == ns) &&
+ return (ns.empty() || _ecd->getInvocation()->ns().ns() == ns) &&
(commands.empty() ||
std::any_of(commands.begin(), commands.end(), [&request](auto& element) {
return element.valueStringDataSafe() == request.getCommandName();
@@ -957,413 +1074,569 @@ bool runCommandImpl(OperationContext* opCtx,
if (response.hasField("writeConcernError")) {
wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt());
}
- appendErrorLabelsAndTopologyVersion(
- opCtx, &body, sessionOptions, command->getName(), code, wcCode, isInternalClient);
+ appendErrorLabelsAndTopologyVersion(opCtx,
+ &body,
+ _ecd->getSessionOptions(),
+ command->getName(),
+ code,
+ wcCode,
+ _isInternalClient());
}
auto commandBodyBob = replyBuilder->getBodyBuilder();
behaviors.appendReplyMetadata(opCtx, request, &commandBodyBob);
- appendClusterAndOperationTime(opCtx, &commandBodyBob, &commandBodyBob, startOperationTime);
+ appendClusterAndOperationTime(
+ opCtx, &commandBodyBob, &commandBodyBob, _ecd->getStartOperationTime());
return ok;
}
-/**
- * Executes a command after stripping metadata, performing authorization checks,
- * handling audit impersonation, and (potentially) setting maintenance mode. This method
- * also checks that the command is permissible to run on the node given its current
- * replication state. All the logic here is independent of any particular command; any
- * functionality relevant to a specific command should be confined to its run() method.
- */
-void execCommandDatabase(OperationContext* opCtx,
- Command* command,
- const OpMsgRequest& request,
- rpc::ReplyBuilderInterface* replyBuilder,
- const ServiceEntryPointCommon::Hooks& behaviors) {
- CommandHelpers::uassertShouldAttemptParse(opCtx, command, request);
- BSONObjBuilder extraFieldsBuilder;
- auto startOperationTime = getClientOperationTime(opCtx);
+Future<void> RunCommandImpl::_runCommand() {
+ auto execContext = _ecd->getExecutionContext();
+ invariant(!_shouldWaitForWriteConcern);
+ execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern(
+ execContext->getRequest().body);
+ return std::make_shared<InvokeCommand>(_ecd)->run(_shouldCheckOutSession);
+}
- std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request);
- CommandInvocation::set(opCtx, invocation);
+void RunCommandImpl::RunCommandAndWaitForWriteConcern::_waitForWriteConcern(BSONObjBuilder& bb) {
+ auto invocation = _rci->_ecd->getInvocation().get();
+ auto opCtx = _execContext->getOpCtx();
+ if (auto scoped = failCommand.scopedIf([&](const BSONObj& obj) {
+ return CommandHelpers::shouldActivateFailCommandFailPoint(
+ obj, invocation, opCtx->getClient()) &&
+ obj.hasField("writeConcernError");
+ });
+ MONGO_unlikely(scoped.isActive())) {
+ const BSONObj& data = scoped.getData();
+ bb.append(data["writeConcernError"]);
+ if (data.hasField(kErrorLabelsFieldName) && data[kErrorLabelsFieldName].type() == Array) {
+ // Propagate error labels specified in the failCommand failpoint to the
+ // OperationContext decoration to override getErrorLabels() behaviors.
+ invariant(!errorLabelsOverride(opCtx));
+ errorLabelsOverride(opCtx).emplace(
+ data.getObjectField(kErrorLabelsFieldName).getOwned());
+ }
+ return;
+ }
- OperationSessionInfoFromClient sessionOptions;
+ CurOp::get(opCtx)->debug().writeConcern.emplace(opCtx->getWriteConcern());
+ _execContext->behaviors->waitForWriteConcern(opCtx, invocation, _lastOpBeforeRun.get(), bb);
+}
- const auto isInternalClient = opCtx->getClient()->session() &&
- (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient);
+Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::run(
+ std::shared_ptr<RunCommandImpl> rci) {
+ auto instance = std::make_shared<RunCommandAndWaitForWriteConcern>(std::move(rci));
+ // `_setup()` runs inline as part of preparing the future-chain, which will run the command and
+ // waits for write concern, and may throw.
+ instance->_setup();
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([instance] { return instance->_run(); })
+ .onCompletion([instance](Status status) {
+ return instance->_onRunCompletion(std::move(status));
+ });
+ pf.promise.emplaceValue();
+ return future;
+}
- const auto isHello = command->getName() == "hello"_sd || command->getName() == "isMaster"_sd;
+void RunCommandImpl::RunCommandAndWaitForWriteConcern::_setup() {
+ auto invocation = _rci->_ecd->getInvocation();
+ OperationContext* opCtx = _execContext->getOpCtx();
+ const Command* command = invocation->definition();
+ const OpMsgRequest& request = _execContext->getRequest();
- try {
- const auto apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command);
- Client* client = opCtx->getClient();
+ _lastOpBeforeRun.emplace(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
- {
- stdx::lock_guard<Client> lk(*client);
- CurOp::get(opCtx)->setCommand_inlock(command);
- APIParameters::get(opCtx) = APIParameters::fromClient(apiParamsFromClient);
+ if (command->getLogicalOp() == LogicalOp::opGetMore) {
+ // WriteConcern will be set up during command processing, it must not be specified on
+ // the command body.
+ _execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern(request.body);
+ } else {
+ // WriteConcern should always be explicitly specified by operations received on shard
+ // and config servers, even if it is empty (ie. writeConcern: {}). In this context
+ // (shard/config servers) an empty WC indicates the operation should use the implicit
+ // server defaults. So, warn if the operation has not specified writeConcern and is on
+ // a shard/config server.
+ if (!opCtx->getClient()->isInDirectClient() &&
+ (!opCtx->inMultiDocumentTransaction() || isTransactionCommand(command->getName()))) {
+ if (_rci->_isInternalClient()) {
+ // WriteConcern should always be explicitly specified by operations received
+ // from internal clients (ie. from a mongos or mongod), even if it is empty
+ // (ie. writeConcern: {}, which is equivalent to { w: 1, wtimeout: 0 }).
+ uassert(
+ 4569201,
+ "received command without explicit writeConcern on an internalClient connection {}"_format(
+ redact(request.body.toString())),
+ request.body.hasField(WriteConcernOptions::kWriteConcernField));
+ } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer ||
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ if (!request.body.hasField(WriteConcernOptions::kWriteConcernField)) {
+ // TODO: Disabled until after SERVER-44539, to avoid log spam.
+ // LOGV2(21959, "Missing writeConcern on {command}", "Missing "
+ // "writeConcern on command", "command"_attr = command->getName());
+ }
+ }
}
-
- if (isHello) {
- // Preload generic ClientMetadata ahead of our first hello request. After the first
- // request, metaElement should always be empty.
- auto metaElem = request.body[kMetadataDocumentName];
- ClientMetadata::setFromMetadata(opCtx->getClient(), metaElem);
+ _extractedWriteConcern.emplace(
+ uassertStatusOK(extractWriteConcern(opCtx, request.body, _rci->_isInternalClient())));
+ if (_rci->_ecd->getSessionOptions().getAutocommit()) {
+ validateWriteConcernForTransaction(*_extractedWriteConcern,
+ invocation->definition()->getName());
}
- auto& apiParams = APIParameters::get(opCtx);
- auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext());
- if (auto clientMetadata = ClientMetadata::get(client)) {
- auto appName = clientMetadata->getApplicationName().toString();
- apiVersionMetrics.update(appName, apiParams);
+ // Ensure that the WC being set on the opCtx has provenance.
+ invariant(_extractedWriteConcern->getProvenance().hasSource(),
+ fmt::format("unexpected unset provenance on writeConcern: {}",
+ _extractedWriteConcern->toBSON().jsonString()));
+
+ opCtx->setWriteConcern(*_extractedWriteConcern);
+ }
+}
+
+Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::_run() {
+ if (auto scoped = failWithErrorCodeInRunCommand.scoped(); MONGO_unlikely(scoped.isActive())) {
+ const auto errorCode = scoped.getData()["errorCode"].numberInt();
+ LOGV2(21960,
+ "failWithErrorCodeInRunCommand enabled - failing command with error "
+ "code: {errorCode}",
+ "failWithErrorCodeInRunCommand enabled, failing command",
+ "errorCode"_attr = errorCode);
+ BSONObjBuilder errorBuilder;
+ errorBuilder.append("ok", 0.0);
+ errorBuilder.append("code", errorCode);
+ errorBuilder.append("errmsg", "failWithErrorCodeInRunCommand enabled.");
+ _execContext->getReplyBuilder()->setCommandReply(errorBuilder.obj());
+ return Status::OK();
+ }
+ return std::make_shared<InvokeCommand>(_rci->_ecd)->run(_rci->_shouldCheckOutSession);
+}
+
+Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::_onRunCompletion(Status status) {
+ auto opCtx = _execContext->getOpCtx();
+ if (!status.isOK()) {
+ // Do no-op write before returning NoSuchTransaction if command has writeConcern.
+ if (status.code() == ErrorCodes::NoSuchTransaction &&
+ !opCtx->getWriteConcern().usedDefault) {
+ TransactionParticipant::performNoopWrite(opCtx, "NoSuchTransaction error");
}
+ _waitForWriteConcern(*_rci->_ecd->getExtraFieldsBuilder());
+ return status;
+ }
- sleepMillisAfterCommandExecutionBegins.execute([&](const BSONObj& data) {
- auto numMillis = data["millis"].numberInt();
- auto commands = data["commands"].Obj().getFieldNames<std::set<std::string>>();
- // Only sleep for one of the specified commands.
- if (commands.find(command->getName()) != commands.end()) {
- mongo::sleepmillis(numMillis);
- }
- });
+ auto bb = _execContext->getReplyBuilder()->getBodyBuilder();
+ _waitForWriteConcern(bb);
- rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
- rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName());
+ // With the exception of getMores inheriting the WriteConcern from the originating command,
+ // nothing in run() should change the writeConcern.
+ if (_execContext->getCommand()->getLogicalOp() == LogicalOp::opGetMore) {
+ dassert(!_extractedWriteConcern, "opGetMore contained unexpected extracted write concern");
+ } else {
+ dassert(_extractedWriteConcern, "no extracted write concern");
+ dassert(
+ opCtx->getWriteConcern() == _extractedWriteConcern,
+ "opCtx wc: {} extracted wc: {}"_format(opCtx->getWriteConcern().toBSON().jsonString(),
+ _extractedWriteConcern->toBSON().jsonString()));
+ }
+ return status;
+}
- auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
+Future<void> RunCommandImpl::_makeFutureChain() {
+ return _prologue()
+ .then([this] {
+ if (_shouldWaitForWriteConcern)
+ return RunCommandAndWaitForWriteConcern::run(shared_from_this());
+ else
+ return _runCommand();
+ })
+ .then([this] { return _epilogue(); })
+ .onCompletion(
+ [this, anchor = shared_from_this()](StatusWith<bool> ranSuccessfully) -> Future<void> {
+ // Failure to run a command is either indicated by throwing an exception or adding a
+ // non-okay field to the replyBuilder. The input argument (i.e., `ranSuccessfully`)
+ // captures both cases. On success, it holds an okay status and a `true` value.
+ auto status = ranSuccessfully.getStatus();
+ if (status.isOK() && ranSuccessfully.getValue())
+ return Status::OK();
+
+ auto execContext = _ecd->getExecutionContext();
+ execContext->getCommand()->incrementCommandsFailed();
+ if (status.code() == ErrorCodes::Unauthorized) {
+ CommandHelpers::auditLogAuthEvent(execContext->getOpCtx(),
+ _ecd->getInvocation().get(),
+ execContext->getRequest(),
+ status.code());
+ }
+ return status;
+ });
+}
- sessionOptions = initializeOperationSessionInfo(
- opCtx,
- request.body,
- command->requiresAuth(),
- command->attachLogicalSessionsToOpCtx(),
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet);
+Future<void> ExecCommandDatabase::_initiateCommand() try {
+ auto opCtx = _execContext->getOpCtx();
+ auto& request = _execContext->getRequest();
+ auto command = _execContext->getCommand();
+ auto replyBuilder = _execContext->getReplyBuilder();
- CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get());
+ const auto apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command);
+ Client* client = opCtx->getClient();
- const auto dbname = request.getDatabase().toString();
- uassert(
- ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid database name: '" << dbname << "'",
- NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
+ {
+ stdx::lock_guard<Client> lk(*client);
+ CurOp::get(opCtx)->setCommand_inlock(command);
+ APIParameters::get(opCtx) = APIParameters::fromClient(apiParamsFromClient);
+ }
- ResourceConsumption::ScopedMetricsCollector scopedMetrics(
- opCtx, dbname, command->collectsResourceConsumptionMetrics());
-
- const auto allowTransactionsOnConfigDatabase =
- (serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
- serverGlobalParams.clusterRole == ClusterRole::ShardServer);
-
- validateSessionOptions(sessionOptions,
- command->getName(),
- invocation->ns(),
- allowTransactionsOnConfigDatabase);
-
- std::unique_ptr<MaintenanceModeSetter> mmSetter;
-
- BSONElement cmdOptionMaxTimeMSField;
- BSONElement maxTimeMSOpOnlyField;
- BSONElement allowImplicitCollectionCreationField;
- BSONElement helpField;
-
- StringMap<int> topLevelFields;
- for (auto&& element : request.body) {
- StringData fieldName = element.fieldNameStringData();
- if (fieldName == QueryRequest::cmdOptionMaxTimeMS) {
- cmdOptionMaxTimeMSField = element;
- } else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) {
- uassert(ErrorCodes::InvalidOptions,
- "Can not specify maxTimeMSOpOnly for non internal clients",
- isInternalClient);
- maxTimeMSOpOnlyField = element;
- } else if (fieldName == "allowImplicitCollectionCreation") {
- allowImplicitCollectionCreationField = element;
- } else if (fieldName == CommandHelpers::kHelpFieldName) {
- helpField = element;
- } else if (fieldName == "comment") {
- opCtx->setComment(element.wrap());
- } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) {
- uasserted(ErrorCodes::InvalidOptions,
- "no such command option $maxTimeMs; use maxTimeMS instead");
- }
+ if (isHello()) {
+ // Preload generic ClientMetadata ahead of our first hello request. After the first
+ // request, metaElement should always be empty.
+ auto metaElem = request.body[kMetadataDocumentName];
+ ClientMetadata::setFromMetadata(opCtx->getClient(), metaElem);
+ }
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Parsed command object contains duplicate top level key: "
- << fieldName,
- topLevelFields[fieldName]++ == 0);
- }
+ auto& apiParams = APIParameters::get(opCtx);
+ auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext());
+ if (auto clientMetadata = ClientMetadata::get(client)) {
+ auto appName = clientMetadata->getApplicationName().toString();
+ apiVersionMetrics.update(appName, apiParams);
+ }
- if (CommandHelpers::isHelpRequest(helpField)) {
- CurOp::get(opCtx)->ensureStarted();
- // We disable last-error for help requests due to SERVER-11492, because config
- // servers use help requests to determine which commands are database writes, and so
- // must be forwarded to all config servers.
- LastError::get(opCtx->getClient()).disable();
- Command::generateHelpResponse(opCtx, replyBuilder, *command);
- return;
+ sleepMillisAfterCommandExecutionBegins.execute([&](const BSONObj& data) {
+ auto numMillis = data["millis"].numberInt();
+ auto commands = data["commands"].Obj().getFieldNames<std::set<std::string>>();
+ // Only sleep for one of the specified commands.
+ if (commands.find(command->getName()) != commands.end()) {
+ mongo::sleepmillis(numMillis);
}
+ });
- ImpersonationSessionGuard guard(opCtx);
- invocation->checkAuthorization(opCtx, request);
+ rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
+ rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName());
- const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname);
+ auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!opCtx->getClient()->isInDirectClient() &&
- !MONGO_unlikely(skipCheckingForNotPrimaryInCommandDispatch.shouldFail())) {
- const bool inMultiDocumentTransaction = (sessionOptions.getAutocommit() == false);
- auto allowed = command->secondaryAllowed(opCtx->getServiceContext());
- bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways;
- bool couldHaveOptedIn =
- allowed == Command::AllowedOnSecondary::kOptIn && !inMultiDocumentTransaction;
- bool optedIn =
- couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary();
- bool canRunHere = commandCanRunHere(opCtx, dbname, command, inMultiDocumentTransaction);
- if (!canRunHere && couldHaveOptedIn) {
- const auto msg = client->supportsHello() ? "not primary and secondaryOk=false"_sd
- : "not master and slaveOk=false"_sd;
- uasserted(ErrorCodes::NotPrimaryNoSecondaryOk, msg);
- }
+ _sessionOptions = initializeOperationSessionInfo(opCtx,
+ request.body,
+ command->requiresAuth(),
+ command->attachLogicalSessionsToOpCtx(),
+ replCoord->getReplicationMode() ==
+ repl::ReplicationCoordinator::modeReplSet);
- if (MONGO_unlikely(respondWithNotPrimaryInCommandDispatch.shouldFail())) {
- uassert(ErrorCodes::NotWritablePrimary, "not primary", canRunHere);
- } else {
- const auto msg = client->supportsHello() ? "not primary"_sd : "not master"_sd;
- uassert(ErrorCodes::NotWritablePrimary, msg, canRunHere);
- }
+ CommandHelpers::evaluateFailCommandFailPoint(opCtx, _invocation.get());
- if (!command->maintenanceOk() &&
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
- !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) &&
- !replCoord->getMemberState().secondary()) {
-
- uassert(ErrorCodes::NotPrimaryOrSecondary,
- "node is recovering",
- !replCoord->getMemberState().recovering());
- uassert(ErrorCodes::NotPrimaryOrSecondary,
- "node is not in primary or recovering state",
- replCoord->getMemberState().primary());
- // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode
- uassert(ErrorCodes::NotPrimaryOrSecondary,
- "node is in drain mode",
- optedIn || alwaysAllowed);
- }
- }
+ const auto dbname = request.getDatabase().toString();
+ uassert(ErrorCodes::InvalidNamespace,
+ fmt::format("Invalid database name: '{}'", dbname),
+ NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
+
+ ResourceConsumption::ScopedMetricsCollector scopedMetrics(
+ opCtx, dbname, command->collectsResourceConsumptionMetrics());
+
+ const auto allowTransactionsOnConfigDatabase =
+ (serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
+ serverGlobalParams.clusterRole == ClusterRole::ShardServer);
- if (command->adminOnly()) {
- LOGV2_DEBUG(21961,
- 2,
- "Admin only command: {command}",
- "Admin only command",
- "command"_attr = request.getCommandName());
+ validateSessionOptions(
+ _sessionOptions, command->getName(), _invocation->ns(), allowTransactionsOnConfigDatabase);
+
+ std::unique_ptr<MaintenanceModeSetter> mmSetter;
+
+ BSONElement cmdOptionMaxTimeMSField;
+ BSONElement maxTimeMSOpOnlyField;
+ BSONElement allowImplicitCollectionCreationField;
+ BSONElement helpField;
+
+ StringMap<int> topLevelFields;
+ for (auto&& element : request.body) {
+ StringData fieldName = element.fieldNameStringData();
+ if (fieldName == QueryRequest::cmdOptionMaxTimeMS) {
+ cmdOptionMaxTimeMSField = element;
+ } else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) {
+ uassert(ErrorCodes::InvalidOptions,
+ "Can not specify maxTimeMSOpOnly for non internal clients",
+ _isInternalClient());
+ maxTimeMSOpOnlyField = element;
+ } else if (fieldName == "allowImplicitCollectionCreation") {
+ allowImplicitCollectionCreationField = element;
+ } else if (fieldName == CommandHelpers::kHelpFieldName) {
+ helpField = element;
+ } else if (fieldName == "comment") {
+ opCtx->setComment(element.wrap());
+ } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) {
+ uasserted(ErrorCodes::InvalidOptions,
+ "no such command option $maxTimeMs; use maxTimeMS instead");
}
- if (command->maintenanceMode()) {
- mmSetter.reset(new MaintenanceModeSetter(opCtx));
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Parsed command object contains duplicate top level key: "
+ << fieldName,
+ topLevelFields[fieldName]++ == 0);
+ }
+
+ if (CommandHelpers::isHelpRequest(helpField)) {
+ CurOp::get(opCtx)->ensureStarted();
+ // We disable last-error for help requests due to SERVER-11492, because config servers use
+ // help requests to determine which commands are database writes, and so must be forwarded
+ // to all config servers.
+ LastError::get(opCtx->getClient()).disable();
+ Command::generateHelpResponse(opCtx, replyBuilder, *command);
+ return Status(ErrorCodes::SkipCommandExecution,
+ "Skipping command execution for help request");
+ }
+
+ ImpersonationSessionGuard guard(opCtx);
+ _invocation->checkAuthorization(opCtx, request);
+
+ const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname);
+
+ if (!opCtx->getClient()->isInDirectClient() &&
+ !MONGO_unlikely(skipCheckingForNotPrimaryInCommandDispatch.shouldFail())) {
+ const bool inMultiDocumentTransaction = (_sessionOptions.getAutocommit() == false);
+ auto allowed = command->secondaryAllowed(opCtx->getServiceContext());
+ bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways;
+ bool couldHaveOptedIn =
+ allowed == Command::AllowedOnSecondary::kOptIn && !inMultiDocumentTransaction;
+ bool optedIn = couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary();
+ bool canRunHere = commandCanRunHere(opCtx, dbname, command, inMultiDocumentTransaction);
+ if (!canRunHere && couldHaveOptedIn) {
+ const auto msg = client->supportsHello() ? "not primary and secondaryOk=false"_sd
+ : "not master and slaveOk=false"_sd;
+ uasserted(ErrorCodes::NotPrimaryNoSecondaryOk, msg);
}
- if (command->shouldAffectCommandCounter()) {
- OpCounters* opCounters = &globalOpCounters;
- opCounters->gotCommand();
+ if (MONGO_unlikely(respondWithNotPrimaryInCommandDispatch.shouldFail())) {
+ uassert(ErrorCodes::NotWritablePrimary, "not primary", canRunHere);
+ } else {
+ const auto msg = client->supportsHello() ? "not primary"_sd : "not master"_sd;
+ uassert(ErrorCodes::NotWritablePrimary, msg, canRunHere);
}
- // Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation
- // on the OperationContext. The 'maxTimeMS' option unfortunately has a different meaning
- // for a getMore command, where it is used to communicate the maximum time to wait for
- // new inserts on tailable cursors, not as a deadline for the operation.
- // TODO SERVER-34277 Remove the special handling for maxTimeMS for getMores. This will
- // require introducing a new 'max await time' parameter for getMore, and eventually
- // banning maxTimeMS altogether on a getMore command.
- int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField));
- int maxTimeMSOpOnly = uassertStatusOK(QueryRequest::parseMaxTimeMS(maxTimeMSOpOnlyField));
-
- // The "hello" or "isMaster" commands should not inherit the deadline from the user op it is
- // operating as a part of as that can interfere with replica set monitoring and host
- // selection.
- const bool ignoreMaxTimeMSOpOnly = isHello;
-
- if ((maxTimeMS > 0 || maxTimeMSOpOnly > 0) &&
- command->getLogicalOp() != LogicalOp::opGetMore) {
- uassert(40119,
- "Illegal attempt to set operation deadline within DBDirectClient",
- !opCtx->getClient()->isInDirectClient());
- if (!ignoreMaxTimeMSOpOnly && maxTimeMSOpOnly > 0 &&
- (maxTimeMS == 0 || maxTimeMSOpOnly < maxTimeMS)) {
- opCtx->storeMaxTimeMS(Milliseconds{maxTimeMS});
- opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMSOpOnly},
- ErrorCodes::MaxTimeMSExpired);
- } else if (maxTimeMS > 0) {
- opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired);
- }
+ if (!command->maintenanceOk() &&
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
+ !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) &&
+ !replCoord->getMemberState().secondary()) {
+
+ uassert(ErrorCodes::NotPrimaryOrSecondary,
+ "node is recovering",
+ !replCoord->getMemberState().recovering());
+ uassert(ErrorCodes::NotPrimaryOrSecondary,
+ "node is not in primary or recovering state",
+ replCoord->getMemberState().primary());
+ // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode
+ uassert(ErrorCodes::NotPrimaryOrSecondary,
+ "node is in drain mode",
+ optedIn || alwaysAllowed);
}
+ }
- auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ if (command->adminOnly()) {
+ LOGV2_DEBUG(21961,
+ 2,
+ "Admin only command: {command}",
+ "Admin only command",
+ "command"_attr = request.getCommandName());
+ }
- // If the parent operation runs in a transaction, we don't override the read concern.
- auto skipReadConcern =
- opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction();
- bool startTransaction = static_cast<bool>(sessionOptions.getStartTransaction());
- if (!skipReadConcern) {
- auto newReadConcernArgs = uassertStatusOK(_extractReadConcern(
- opCtx, invocation.get(), request.body, startTransaction, isInternalClient));
+ if (command->maintenanceMode()) {
+ mmSetter.reset(new MaintenanceModeSetter(opCtx));
+ }
- // Ensure that the RC being set on the opCtx has provenance.
- invariant(newReadConcernArgs.getProvenance().hasSource(),
- str::stream() << "unexpected unset provenance on readConcern: "
- << newReadConcernArgs.toBSONInner());
+ if (command->shouldAffectCommandCounter()) {
+ OpCounters* opCounters = &globalOpCounters;
+ opCounters->gotCommand();
+ }
- uassert(ErrorCodes::InvalidOptions,
- "Only the first command in a transaction may specify a readConcern",
- startTransaction || !opCtx->inMultiDocumentTransaction() ||
- newReadConcernArgs.isEmpty());
-
- {
- // We must obtain the client lock to set the ReadConcernArgs on the operation
- // context as it may be concurrently read by CurrentOp.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- readConcernArgs = std::move(newReadConcernArgs);
- }
+ // Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation on the
+ // OperationContext. The 'maxTimeMS' option unfortunately has a different meaning for a getMore
+ // command, where it is used to communicate the maximum time to wait for new inserts on tailable
+ // cursors, not as a deadline for the operation.
+ // TODO SERVER-34277 Remove the special handling for maxTimeMS for getMores. This will require
+ // introducing a new 'max await time' parameter for getMore, and eventually banning maxTimeMS
+ // altogether on a getMore command.
+ int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField));
+ int maxTimeMSOpOnly = uassertStatusOK(QueryRequest::parseMaxTimeMS(maxTimeMSOpOnlyField));
+
+ // The "hello" command should not inherit the deadline from the user op it is operating as a
+ // part of as that can interfere with replica set monitoring and host selection.
+ bool ignoreMaxTimeMSOpOnly = isHello();
+
+ if ((maxTimeMS > 0 || maxTimeMSOpOnly > 0) && command->getLogicalOp() != LogicalOp::opGetMore) {
+ uassert(40119,
+ "Illegal attempt to set operation deadline within DBDirectClient",
+ !opCtx->getClient()->isInDirectClient());
+ if (!ignoreMaxTimeMSOpOnly && maxTimeMSOpOnly > 0 &&
+ (maxTimeMS == 0 || maxTimeMSOpOnly < maxTimeMS)) {
+ opCtx->storeMaxTimeMS(Milliseconds{maxTimeMS});
+ opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMSOpOnly},
+ ErrorCodes::MaxTimeMSExpired);
+ } else if (maxTimeMS > 0) {
+ opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired);
}
+ }
- if (startTransaction) {
- opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true);
- opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
- }
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+
+ // If the parent operation runs in a transaction, we don't override the read concern.
+ auto skipReadConcern =
+ opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction();
+ bool startTransaction = static_cast<bool>(_sessionOptions.getStartTransaction());
+ if (!skipReadConcern) {
+ auto newReadConcernArgs = uassertStatusOK(_extractReadConcern(
+ opCtx, _invocation.get(), request.body, startTransaction, _isInternalClient()));
+
+ // Ensure that the RC being set on the opCtx has provenance.
+ invariant(newReadConcernArgs.getProvenance().hasSource(),
+ str::stream() << "unexpected unset provenance on readConcern: "
+ << newReadConcernArgs.toBSONInner());
- if (opCtx->inMultiDocumentTransaction() && !startTransaction) {
- uassert(4937700,
- "API parameters are only allowed in the first command of a multi-document "
- "transaction",
- !APIParameters::get(opCtx).getParamsPassed());
+ uassert(ErrorCodes::InvalidOptions,
+ "Only the first command in a transaction may specify a readConcern",
+ startTransaction || !opCtx->inMultiDocumentTransaction() ||
+ newReadConcernArgs.isEmpty());
+
+ {
+ // We must obtain the client lock to set the ReadConcernArgs on the operation context as
+ // it may be concurrently read by CurrentOp.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ readConcernArgs = std::move(newReadConcernArgs);
}
+ }
- // Remember whether or not this operation is starting a transaction, in case something
- // later in the execution needs to adjust its behavior based on this.
- opCtx->setIsStartingMultiDocumentTransaction(startTransaction);
+ if (startTransaction) {
+ opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true);
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+ }
- auto& oss = OperationShardingState::get(opCtx);
+ if (opCtx->inMultiDocumentTransaction() && !startTransaction) {
+ uassert(4937700,
+ "API parameters are only allowed in the first command of a multi-document "
+ "transaction",
+ !APIParameters::get(opCtx).getParamsPassed());
+ }
- if (!opCtx->getClient()->isInDirectClient() &&
- readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern &&
- (iAmPrimary ||
- (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) {
- oss.initializeClientRoutingVersionsFromCommand(invocation->ns(), request.body);
-
- auto const shardingState = ShardingState::get(opCtx);
- if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) {
- uassertStatusOK(shardingState->canAcceptShardedCommands());
- }
+ // Remember whether or not this operation is starting a transaction, in case something later in
+ // the execution needs to adjust its behavior based on this.
+ opCtx->setIsStartingMultiDocumentTransaction(startTransaction);
- behaviors.advanceConfigOpTimeFromRequestMetadata(opCtx);
+ auto& oss = OperationShardingState::get(opCtx);
+
+ if (!opCtx->getClient()->isInDirectClient() &&
+ readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern &&
+ (iAmPrimary || (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) {
+ oss.initializeClientRoutingVersionsFromCommand(_invocation->ns(), request.body);
+
+ auto const shardingState = ShardingState::get(opCtx);
+ if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) {
+ uassertStatusOK(shardingState->canAcceptShardedCommands());
}
- oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField);
- auto scoped = behaviors.scopedOperationCompletionShardingActions(opCtx);
+ _execContext->behaviors->advanceConfigOpTimeFromRequestMetadata(opCtx);
+ }
- // This may trigger the maxTimeAlwaysTimeOut failpoint.
- auto status = opCtx->checkForInterruptNoAssert();
+ oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField);
+ _scoped = _execContext->behaviors->scopedOperationCompletionShardingActions(opCtx);
- // We still proceed if the primary stepped down, but accept other kinds of
- // interruptions. We defer to individual commands to allow themselves to be
- // interruptible by stepdowns, since commands like 'voteRequest' should conversely
- // continue executing.
- if (status != ErrorCodes::PrimarySteppedDown &&
- status != ErrorCodes::InterruptedDueToReplStateChange) {
- uassertStatusOK(status);
- }
+ // This may trigger the maxTimeAlwaysTimeOut failpoint.
+ auto status = opCtx->checkForInterruptNoAssert();
+ // We still proceed if the primary stepped down, but accept other kinds of interruptions. We
+ // defer to individual commands to allow themselves to be interruptible by stepdowns, since
+ // commands like 'voteRequest' should conversely continue executing.
+ if (status != ErrorCodes::PrimarySteppedDown &&
+ status != ErrorCodes::InterruptedDueToReplStateChange) {
+ uassertStatusOK(status);
+ }
- CurOp::get(opCtx)->ensureStarted();
+ CurOp::get(opCtx)->ensureStarted();
- command->incrementCommandsExecuted();
-
- if (shouldLog(logv2::LogComponent::kTracking, logv2::LogSeverity::Debug(1)) &&
- rpc::TrackingMetadata::get(opCtx).getParentOperId()) {
- LOGV2_DEBUG_OPTIONS(4615605,
- 1,
- {logv2::LogComponent::kTracking},
- "Command metadata: {trackingMetadata}",
- "Command metadata",
- "trackingMetadata"_attr = rpc::TrackingMetadata::get(opCtx));
- rpc::TrackingMetadata::get(opCtx).setIsLogged(true);
- }
+ command->incrementCommandsExecuted();
- behaviors.waitForReadConcern(opCtx, invocation.get(), request);
- behaviors.setPrepareConflictBehaviorForReadConcern(opCtx, invocation.get());
-
- try {
- if (!runCommandImpl(opCtx,
- invocation.get(),
- request,
- replyBuilder,
- startOperationTime,
- behaviors,
- &extraFieldsBuilder,
- sessionOptions)) {
- command->incrementCommandsFailed();
- }
- } catch (const DBException& e) {
- command->incrementCommandsFailed();
- if (e.code() == ErrorCodes::Unauthorized) {
- CommandHelpers::auditLogAuthEvent(opCtx, invocation.get(), request, e.code());
- }
- throw;
- }
- } catch (const DBException& e) {
- behaviors.handleException(e, opCtx);
+ if (shouldLog(logv2::LogComponent::kTracking, logv2::LogSeverity::Debug(1)) &&
+ rpc::TrackingMetadata::get(opCtx).getParentOperId()) {
+ LOGV2_DEBUG_OPTIONS(4615605,
+ 1,
+ {logv2::LogComponent::kTracking},
+ "Command metadata: {trackingMetadata}",
+ "Command metadata",
+ "trackingMetadata"_attr = rpc::TrackingMetadata::get(opCtx));
+ rpc::TrackingMetadata::get(opCtx).setIsLogged(true);
+ }
- // Append the error labels for transient transaction errors.
- auto response = extraFieldsBuilder.asTempObj();
- boost::optional<ErrorCodes::Error> wcCode;
- if (response.hasField("writeConcernError")) {
- wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt());
- }
- appendErrorLabelsAndTopologyVersion(opCtx,
- &extraFieldsBuilder,
- sessionOptions,
- command->getName(),
- e.code(),
- wcCode,
- isInternalClient);
+ _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request);
+ _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get());
+ return Status::OK();
+} catch (const DBException& ex) {
+ return ex.toStatus();
+}
- BSONObjBuilder metadataBob;
- behaviors.appendReplyMetadata(opCtx, request, &metadataBob);
+Future<void> ExecCommandDatabase::_commandExec() {
+ return RunCommandImpl::run(shared_from_this());
+}
- // The read concern may not have yet been placed on the operation context, so attempt to
- // parse it here, so if it is valid it can be used to compute the proper operationTime.
- auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- if (readConcernArgs.isEmpty()) {
- auto readConcernArgsStatus = _extractReadConcern(opCtx,
- invocation.get(),
- request.body,
- false /*startTransaction*/,
- isInternalClient);
- if (readConcernArgsStatus.isOK()) {
- // We must obtain the client lock to set the ReadConcernArgs on the operation
- // context as it may be concurrently read by CurrentOp.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- readConcernArgs = readConcernArgsStatus.getValue();
- }
- }
- appendClusterAndOperationTime(opCtx, &extraFieldsBuilder, &metadataBob, startOperationTime);
+void ExecCommandDatabase::_handleFailure(Status status) {
+ // Absorb the exception as the command execution has already been skipped.
+ if (status.code() == ErrorCodes::SkipCommandExecution)
+ return;
- LOGV2_DEBUG(21962,
- 1,
- "Assertion while executing command '{command}' on database '{db}' with "
- "arguments '{commandArgs}': {error}",
- "Assertion while executing command",
- "command"_attr = request.getCommandName(),
- "db"_attr = request.getDatabase(),
- "commandArgs"_attr = redact(
- ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)),
- "error"_attr = redact(e.toString()));
+ auto opCtx = _execContext->getOpCtx();
+ auto& request = _execContext->getRequest();
+ auto command = _execContext->getCommand();
+ auto replyBuilder = _execContext->getReplyBuilder();
+ const auto& behaviors = *_execContext->behaviors;
- generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), extraFieldsBuilder.obj());
+ behaviors.handleException(status, opCtx);
- if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(e.code())) {
- // Rethrow the exception to the top to signal that the client connection should be
- // closed.
- throw;
+ // Append the error labels for transient transaction errors.
+ auto response = _extraFieldsBuilder.asTempObj();
+ boost::optional<ErrorCodes::Error> wcCode;
+ if (response.hasField("writeConcernError")) {
+ wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt());
+ }
+ appendErrorLabelsAndTopologyVersion(opCtx,
+ &_extraFieldsBuilder,
+ _sessionOptions,
+ command->getName(),
+ status.code(),
+ wcCode,
+ _isInternalClient());
+
+ BSONObjBuilder metadataBob;
+ behaviors.appendReplyMetadata(opCtx, request, &metadataBob);
+
+ // The read concern may not have yet been placed on the operation context, so attempt to parse
+ // it here, so if it is valid it can be used to compute the proper operationTime.
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ if (readConcernArgs.isEmpty()) {
+ auto readConcernArgsStatus = _extractReadConcern(opCtx,
+ _invocation.get(),
+ request.body,
+ false /*startTransaction*/,
+ _isInternalClient());
+ if (readConcernArgsStatus.isOK()) {
+ // We must obtain the client lock to set the ReadConcernArgs on the operation context as
+ // it may be concurrently read by CurrentOp.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ readConcernArgs = readConcernArgsStatus.getValue();
}
}
+ appendClusterAndOperationTime(opCtx, &_extraFieldsBuilder, &metadataBob, _startOperationTime);
+
+ LOGV2_DEBUG(21962,
+ 1,
+ "Assertion while executing command '{command}' on database '{db}' with "
+ "arguments '{commandArgs}': {error}",
+ "Assertion while executing command",
+ "command"_attr = request.getCommandName(),
+ "db"_attr = request.getDatabase(),
+ "commandArgs"_attr = redact(
+ ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)),
+ "error"_attr = redact(status.toString()));
+
+ generateErrorResponse(
+ opCtx, replyBuilder, status, metadataBob.obj(), _extraFieldsBuilder.obj());
+
+ if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(status.code())) {
+ // Rethrow the exception to the top to signal that the client connection should be closed.
+ internalAssert(status);
+ }
}
/**
@@ -1383,114 +1656,94 @@ void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) {
curop->setNS_inlock(nss.ns());
}
-DbResponse receivedCommands(OperationContext* opCtx,
- const Message& message,
- const ServiceEntryPointCommon::Hooks& behaviors) {
- auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));
- OpMsgRequest request;
- Command* c = nullptr;
- [&] {
- try { // Parse.
- request = rpc::opMsgRequestFromAnyProtocol(message);
- } catch (const DBException& ex) {
- // If this error needs to fail the connection, propagate it out.
- if (ErrorCodes::isConnectionFatalMessageParseError(ex.code()))
- throw;
-
- BSONObjBuilder metadataBob;
- behaviors.appendReplyMetadataOnError(opCtx, &metadataBob);
-
- BSONObjBuilder extraFieldsBuilder;
- appendClusterAndOperationTime(
- opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized);
-
- // Otherwise, reply with the parse error. This is useful for cases where parsing fails
- // due to user-supplied input, such as the document too deep error. Since we failed
- // during parsing, we can't log anything about the command.
- LOGV2_DEBUG(21963,
- 1,
- "Assertion while parsing command: {error}",
- "Assertion while parsing command",
- "error"_attr = ex.toString());
-
- generateErrorResponse(
- opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj());
-
- return; // From lambda. Don't try executing if parsing failed.
- }
-
- try { // Execute.
- curOpCommandSetup(opCtx, request);
-
- // In the absence of a Command object, no redaction is possible. Therefore
- // to avoid displaying potentially sensitive information in the logs,
- // we restrict the log message to the name of the unrecognized command.
- // However, the complete command object will still be echoed to the client.
- if (!(c = CommandHelpers::findCommand(request.getCommandName()))) {
- globalCommandRegistry()->incrementUnknownCommands();
- std::string msg = str::stream()
- << "no such command: '" << request.getCommandName() << "'";
- LOGV2_DEBUG(21964,
- 2,
- "No such command: {command}",
- "Command not found in registry",
- "command"_attr = request.getCommandName());
- uasserted(ErrorCodes::CommandNotFound, str::stream() << msg);
- }
-
- LOGV2_DEBUG(21965,
- 2,
- "Run command {db}.$cmd {commandArgs}",
- "About to run the command",
- "db"_attr = request.getDatabase(),
- "commandArgs"_attr = redact(
- ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body)));
-
- {
- // Try to set this as early as possible, as soon as we have figured out the
- // command.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp());
- }
-
- opCtx->setExhaust(OpMsg::isFlagSet(message, OpMsg::kExhaustSupported));
+Future<void> parseCommand(std::shared_ptr<HandleRequest::ExecutionContext> execContext) try {
+ execContext->setRequest(rpc::opMsgRequestFromAnyProtocol(execContext->getMessage()));
+ return Status::OK();
+} catch (const DBException& ex) {
+ // Need to set request as `makeCommandResponse` expects an empty request on failure.
+ execContext->setRequest({});
+
+ // Otherwise, reply with the parse error. This is useful for cases where parsing fails due to
+ // user-supplied input, such as the document too deep error. Since we failed during parsing, we
+ // can't log anything about the command.
+ LOGV2_DEBUG(21963,
+ 1,
+ "Assertion while parsing command: {error}",
+ "Assertion while parsing command",
+ "error"_attr = ex.toString());
+
+ return ex.toStatus();
+}
- const auto session = opCtx->getClient()->session();
- if (session) {
- if (!opCtx->isExhaust() ||
- (c->getName() != "hello"_sd && c->getName() != "isMaster"_sd)) {
- InExhaustHello::get(session.get())
- ->setInExhaust(false, request.getCommandName());
+Future<void> executeCommand(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
+ auto [past, present] = makePromiseFuture<void>();
+ auto future =
+ std::move(present)
+ .then([execContext]() -> Future<void> {
+ // Prepare environment for command execution (e.g., find command object in registry)
+ auto opCtx = execContext->getOpCtx();
+ auto& request = execContext->getRequest();
+ curOpCommandSetup(opCtx, request);
+
+ // In the absence of a Command object, no redaction is possible. Therefore to avoid
+ // displaying potentially sensitive information in the logs, we restrict the log
+ // message to the name of the unrecognized command. However, the complete command
+ // object will still be echoed to the client.
+ if (execContext->setCommand(CommandHelpers::findCommand(request.getCommandName()));
+ !execContext->getCommand()) {
+ globalCommandRegistry()->incrementUnknownCommands();
+ LOGV2_DEBUG(21964,
+ 2,
+ "No such command: {command}",
+ "Command not found in registry",
+ "command"_attr = request.getCommandName());
+ return Status(ErrorCodes::CommandNotFound,
+ fmt::format("no such command: '{}'", request.getCommandName()));
}
- }
- execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors);
- } catch (const DBException& ex) {
- BSONObjBuilder metadataBob;
- behaviors.appendReplyMetadataOnError(opCtx, &metadataBob);
+ Command* c = execContext->getCommand();
+ LOGV2_DEBUG(
+ 21965,
+ 2,
+ "Run command {db}.$cmd {commandArgs}",
+ "About to run the command",
+ "db"_attr = request.getDatabase(),
+ "commandArgs"_attr = redact(
+ ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body)));
- BSONObjBuilder extraFieldsBuilder;
- appendClusterAndOperationTime(
- opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized);
+ {
+ // Try to set this as early as possible, as soon as we have figured out the
+ // command.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp());
+ }
- LOGV2_DEBUG(21966,
- 1,
- "Assertion while executing command '{command}' on database '{db}': {error}",
- "Assertion while executing command",
- "command"_attr = request.getCommandName(),
- "db"_attr = request.getDatabase(),
- "error"_attr = ex.toString());
+ opCtx->setExhaust(
+ OpMsg::isFlagSet(execContext->getMessage(), OpMsg::kExhaustSupported));
- generateErrorResponse(
- opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj());
+ return Status::OK();
+ })
+ .then([execContext] { return ExecCommandDatabase::run(std::move(execContext)); })
+ .tapError([execContext](Status status) {
+ LOGV2_DEBUG(
+ 21966,
+ 1,
+ "Assertion while executing command '{command}' on database '{db}': {error}",
+ "Assertion while executing command",
+ "command"_attr = execContext->getRequest().getCommandName(),
+ "db"_attr = execContext->getRequest().getDatabase(),
+ "error"_attr = status.toString());
+ });
+ past.emplaceValue();
+ return future;
+}
- if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(ex.code())) {
- // Rethrow the exception to the top to signal that the client connection should be
- // closed.
- throw;
- }
- }
- }();
+DbResponse makeCommandResponse(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
+ auto opCtx = execContext->getOpCtx();
+ const Message& message = execContext->getMessage();
+ OpMsgRequest request = execContext->getRequest();
+ const Command* c = execContext->getCommand();
+ auto replyBuilder = execContext->getReplyBuilder();
if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) {
// Close the connection to get client to go through server selection again.
@@ -1524,6 +1777,38 @@ DbResponse receivedCommands(OperationContext* opCtx,
return dbResponse;
}
+Future<DbResponse> receivedCommands(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
+ execContext->setReplyBuilder(
+ rpc::makeReplyBuilder(rpc::protocolForMessage(execContext->getMessage())));
+ return parseCommand(execContext)
+ .then([execContext]() { return executeCommand(std::move(execContext)); })
+ .onError([execContext](Status status) {
+ if (ErrorCodes::isConnectionFatalMessageParseError(status.code())) {
+ // If this error needs to fail the connection, propagate it out.
+ internalAssert(status);
+ }
+
+ auto opCtx = execContext->getOpCtx();
+ BSONObjBuilder metadataBob;
+ execContext->behaviors->appendReplyMetadataOnError(opCtx, &metadataBob);
+
+ BSONObjBuilder extraFieldsBuilder;
+ appendClusterAndOperationTime(
+ opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized);
+
+ auto replyBuilder = execContext->getReplyBuilder();
+ generateErrorResponse(
+ opCtx, replyBuilder, status, metadataBob.obj(), extraFieldsBuilder.obj());
+
+ if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(status.code())) {
+ // Return the exception to the top to signal that the client connection should be
+ // closed.
+ internalAssert(status);
+ }
+ })
+ .then([execContext] { return makeCommandResponse(std::move(execContext)); });
+}
+
DbResponse receivedQuery(OperationContext* opCtx,
const NamespaceString& nss,
Client& c,
@@ -1551,7 +1836,7 @@ DbResponse receivedQuery(OperationContext* opCtx,
dbResponse.shouldRunAgainForExhaust = runQuery(opCtx, q, nss, dbResponse.response);
} catch (const AssertionException& e) {
- behaviors.handleException(e, opCtx);
+ behaviors.handleException(e.toStatus(), opCtx);
dbResponse.response.reset();
generateLegacyQueryErrorResponse(e, q, &op, &dbResponse.response);
@@ -1742,32 +2027,40 @@ DbResponse receivedGetMore(OperationContext* opCtx,
struct CommandOpRunner : HandleRequest::OpRunner {
using HandleRequest::OpRunner::OpRunner;
- DbResponse run() override {
- DbResponse r = receivedCommands(hr->opCtx, hr->m, hr->behaviors);
- // The hello/isMaster commands should take kMaxAwaitTimeMs at most, log if it takes twice
- // that.
- if (auto command = hr->currentOp().getCommand();
- command && (command->getName() == "hello" || command->getName() == "isMaster")) {
- hr->slowMsOverride =
- 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime);
- }
- return r;
+ Future<DbResponse> run() override {
+ return receivedCommands(executionContext);
}
};
-struct QueryOpRunner : HandleRequest::OpRunner {
+// Allows wrapping synchronous code in futures without repeating the try-catch block.
+struct SynchronousOpRunner : HandleRequest::OpRunner {
using HandleRequest::OpRunner::OpRunner;
- DbResponse run() override {
- hr->opCtx->markKillOnClientDisconnect();
- return receivedQuery(
- hr->opCtx, hr->nsString(), *hr->opCtx->getClient(), hr->m, hr->behaviors);
+ virtual DbResponse runSync() = 0;
+ Future<DbResponse> run() final try { return runSync(); } catch (const DBException& ex) {
+ return ex.toStatus();
}
};
-struct GetMoreOpRunner : HandleRequest::OpRunner {
- using HandleRequest::OpRunner::OpRunner;
- DbResponse run() override {
- return receivedGetMore(hr->opCtx, hr->m, hr->currentOp(), &hr->forceLog);
+struct QueryOpRunner : SynchronousOpRunner {
+ using SynchronousOpRunner::SynchronousOpRunner;
+ DbResponse runSync() override {
+ auto opCtx = executionContext->getOpCtx();
+ opCtx->markKillOnClientDisconnect();
+ return receivedQuery(opCtx,
+ executionContext->nsString(),
+ executionContext->client(),
+ executionContext->getMessage(),
+ *executionContext->behaviors);
+ }
+};
+
+struct GetMoreOpRunner : SynchronousOpRunner {
+ using SynchronousOpRunner::SynchronousOpRunner;
+ DbResponse runSync() override {
+ return receivedGetMore(executionContext->getOpCtx(),
+ executionContext->getMessage(),
+ executionContext->currentOp(),
+ &executionContext->forceLog);
}
};
@@ -1777,49 +2070,70 @@ struct GetMoreOpRunner : HandleRequest::OpRunner {
* class provides a `run` that calls it and handles error reporting
* via the `LastError` slot.
*/
-struct FireAndForgetOpRunner : HandleRequest::OpRunner {
- using HandleRequest::OpRunner::OpRunner;
+struct FireAndForgetOpRunner : SynchronousOpRunner {
+ using SynchronousOpRunner::SynchronousOpRunner;
virtual void runAndForget() = 0;
- DbResponse run() final;
+ DbResponse runSync() final;
};
struct KillCursorsOpRunner : FireAndForgetOpRunner {
using FireAndForgetOpRunner::FireAndForgetOpRunner;
void runAndForget() override {
- hr->currentOp().ensureStarted();
- hr->slowMsOverride = 10;
- receivedKillCursors(hr->opCtx, hr->m);
+ executionContext->currentOp().ensureStarted();
+ executionContext->slowMsOverride = 10;
+ receivedKillCursors(executionContext->getOpCtx(), executionContext->getMessage());
}
};
struct InsertOpRunner : FireAndForgetOpRunner {
using FireAndForgetOpRunner::FireAndForgetOpRunner;
void runAndForget() override {
- hr->assertValidNsString();
- receivedInsert(hr->opCtx, hr->nsString(), hr->m);
+ executionContext->assertValidNsString();
+ receivedInsert(executionContext->getOpCtx(),
+ executionContext->nsString(),
+ executionContext->getMessage());
}
};
struct UpdateOpRunner : FireAndForgetOpRunner {
using FireAndForgetOpRunner::FireAndForgetOpRunner;
void runAndForget() override {
- hr->assertValidNsString();
- receivedUpdate(hr->opCtx, hr->nsString(), hr->m);
+ executionContext->assertValidNsString();
+ receivedUpdate(executionContext->getOpCtx(),
+ executionContext->nsString(),
+ executionContext->getMessage());
}
};
struct DeleteOpRunner : FireAndForgetOpRunner {
using FireAndForgetOpRunner::FireAndForgetOpRunner;
void runAndForget() override {
- hr->assertValidNsString();
- receivedDelete(hr->opCtx, hr->nsString(), hr->m);
+ executionContext->assertValidNsString();
+ receivedDelete(executionContext->getOpCtx(),
+ executionContext->nsString(),
+ executionContext->getMessage());
+ }
+};
+
+struct UnsupportedOpRunner : SynchronousOpRunner {
+ using SynchronousOpRunner::SynchronousOpRunner;
+ DbResponse runSync() override {
+ // For compatibility reasons, we only log incidents of receiving operations that are not
+ // supported and return an empty response to the caller.
+ LOGV2(21968,
+ "Operation isn't supported: {operation}",
+ "Operation is not supported",
+ "operation"_attr = static_cast<int>(executionContext->op()));
+ executionContext->currentOp().done();
+ executionContext->forceLog = true;
+ return {};
}
};
std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() {
- switch (op()) {
+ switch (executionContext->op()) {
case dbQuery:
- if (!nsString().isCommand())
+ if (!executionContext->nsString().isCommand())
return std::make_unique<QueryOpRunner>(this);
// FALLTHROUGH: it's a query containing a command
case dbMsg:
@@ -1835,96 +2149,93 @@ std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() {
case dbDelete:
return std::make_unique<DeleteOpRunner>(this);
default:
- LOGV2(21968,
- "Operation isn't supported: {operation}",
- "Operation is not supported",
- "operation"_attr = static_cast<int>(op()));
- return nullptr;
+ return std::make_unique<UnsupportedOpRunner>(this);
}
}
-DbResponse FireAndForgetOpRunner::run() {
+DbResponse FireAndForgetOpRunner::runSync() {
try {
runAndForget();
} catch (const AssertionException& ue) {
- LastError::get(hr->client()).setLastError(ue.code(), ue.reason());
+ LastError::get(executionContext->client()).setLastError(ue.code(), ue.reason());
LOGV2_DEBUG(21969,
3,
"Caught Assertion in {networkOp}, continuing: {error}",
"Assertion in fire-and-forget operation",
- "networkOp"_attr = networkOpToString(hr->op()),
+ "networkOp"_attr = networkOpToString(executionContext->op()),
"error"_attr = redact(ue));
- hr->currentOp().debug().errInfo = ue.toStatus();
+ executionContext->currentOp().debug().errInfo = ue.toStatus();
}
// A NotWritablePrimary error can be set either within
// receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler above.
// Either way, we want to throw an exception here, which will cause the client to be
// disconnected.
- if (LastError::get(hr->client()).hadNotPrimaryError()) {
+ if (LastError::get(executionContext->client()).hadNotPrimaryError()) {
notPrimaryLegacyUnackWrites.increment();
uasserted(ErrorCodes::NotWritablePrimary,
str::stream() << "Not-master error while processing '"
- << networkOpToString(hr->op()) << "' operation on '"
- << hr->nsString() << "' namespace via legacy "
+ << networkOpToString(executionContext->op()) << "' operation on '"
+ << executionContext->nsString() << "' namespace via legacy "
<< "fire-and-forget command execution.");
}
return {};
}
-void HandleRequest::startOperation() {
- if (client().isInDirectClient()) {
+Future<void> HandleRequest::startOperation() try {
+ auto opCtx = executionContext->getOpCtx();
+ auto& client = executionContext->client();
+ auto& currentOp = executionContext->currentOp();
+
+ if (client.isInDirectClient()) {
if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber()) {
invariant(!opCtx->inMultiDocumentTransaction() &&
!opCtx->lockState()->inAWriteUnitOfWork());
}
} else {
- LastError::get(client()).startRequest();
- AuthorizationSession::get(client())->startRequest(opCtx);
+ LastError::get(client).startRequest();
+ AuthorizationSession::get(client)->startRequest(opCtx);
// We should not be holding any locks at this point
invariant(!opCtx->lockState()->isLocked());
}
{
- stdx::lock_guard<Client> lk(client());
+ stdx::lock_guard<Client> lk(client);
// Commands handling code will reset this if the operation is a command
// which is logically a basic CRUD operation like query, insert, etc.
- currentOp().setNetworkOp_inlock(op());
- currentOp().setLogicalOp_inlock(networkOpToLogicalOp(op()));
+ currentOp.setNetworkOp_inlock(executionContext->op());
+ currentOp.setLogicalOp_inlock(networkOpToLogicalOp(executionContext->op()));
}
+ return {};
+} catch (const DBException& ex) {
+ return ex.toStatus();
}
-DbResponse HandleRequest::run() {
- startOperation();
- DbResponse dbresponse;
- if (auto opRunner = makeOpRunner()) {
- dbresponse = opRunner->run();
- } else {
- currentOp().done();
- forceLog = true;
- }
- completeOperation(dbresponse);
- return dbresponse;
-}
+Future<void> HandleRequest::completeOperation() try {
+ auto opCtx = executionContext->getOpCtx();
+ auto& currentOp = executionContext->currentOp();
-void HandleRequest::completeOperation(const DbResponse& dbresponse) {
// Mark the op as complete, and log it if appropriate. Returns a boolean indicating whether
// this op should be written to the profiler.
- const bool shouldProfile = currentOp().completeAndLogOperation(
- opCtx, MONGO_LOGV2_DEFAULT_COMPONENT, dbresponse.response.size(), slowMsOverride, forceLog);
+ const bool shouldProfile =
+ currentOp.completeAndLogOperation(opCtx,
+ MONGO_LOGV2_DEFAULT_COMPONENT,
+ executionContext->getResponse().response.size(),
+ executionContext->slowMsOverride,
+ executionContext->forceLog);
Top::get(opCtx->getServiceContext())
.incrementGlobalLatencyStats(
opCtx,
- durationCount<Microseconds>(currentOp().elapsedTimeExcludingPauses()),
- currentOp().getReadWriteType());
+ durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()),
+ currentOp.getReadWriteType());
if (shouldProfile) {
// Performance profiling is on
if (opCtx->lockState()->isReadLocked()) {
LOGV2_DEBUG(21970, 1, "Note: not profiling because of recursive read lock");
- } else if (client().isInDirectClient()) {
+ } else if (executionContext->client().isInDirectClient()) {
LOGV2_DEBUG(21971, 1, "Note: not profiling because we are in DBDirectClient");
- } else if (behaviors.lockedForWriting()) {
+ } else if (executionContext->behaviors->lockedForWriting()) {
// TODO SERVER-26825: Fix race condition where fsyncLock is acquired post
// lockedForWriting() call but prior to profile collection lock acquisition.
LOGV2_DEBUG(21972, 1, "Note: not profiling because doing fsync+lock");
@@ -1932,11 +2243,14 @@ void HandleRequest::completeOperation(const DbResponse& dbresponse) {
LOGV2_DEBUG(21973, 1, "Note: not profiling because server is read-only");
} else {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
- profile(opCtx, op());
+ profile(opCtx, executionContext->op());
}
}
recordCurOpMetrics(opCtx);
+ return {};
+} catch (const DBException& ex) {
+ return ex.toStatus();
}
} // namespace
@@ -1950,15 +2264,27 @@ BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* comman
return bob.obj();
}
-Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCtx,
- const Message& m,
- const Hooks& behaviors) noexcept {
- try {
- return Future<DbResponse>::makeReady(HandleRequest{opCtx, m, behaviors}.run());
- } catch (DBException& e) {
- LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(e));
- return e.toStatus();
- }
+Future<DbResponse> ServiceEntryPointCommon::handleRequest(
+ OperationContext* opCtx, const Message& m, std::unique_ptr<const Hooks> behaviors) noexcept {
+ auto hr = std::make_shared<HandleRequest>(opCtx, m, std::move(behaviors));
+ return hr->startOperation()
+ .then([hr]() -> Future<void> {
+ auto opRunner = hr->makeOpRunner();
+ invariant(opRunner);
+ return opRunner->run().then(
+ [execContext = hr->executionContext](DbResponse response) -> void {
+ // Set the response upon successful execution
+ execContext->setResponse(std::move(response));
+ });
+ })
+ .then([hr] { return hr->completeOperation(); })
+ .onCompletion([hr](Status status) -> Future<DbResponse> {
+ if (!status.isOK()) {
+ LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(status));
+ return status;
+ }
+ return hr->executionContext->getResponse();
+ });
}
ServiceEntryPointCommon::Hooks::~Hooks() = default;
diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h
index a5623e41c88..dbf95b165a2 100644
--- a/src/mongo/db/service_entry_point_common.h
+++ b/src/mongo/db/service_entry_point_common.h
@@ -84,7 +84,7 @@ struct ServiceEntryPointCommon {
virtual void attachCurOpErrInfo(OperationContext* opCtx, const BSONObj& replyObj) const = 0;
- virtual void handleException(const DBException& e, OperationContext* opCtx) const = 0;
+ virtual void handleException(const Status& status, OperationContext* opCtx) const = 0;
virtual void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const = 0;
@@ -101,7 +101,7 @@ struct ServiceEntryPointCommon {
static Future<DbResponse> handleRequest(OperationContext* opCtx,
const Message& m,
- const Hooks& hooks) noexcept;
+ std::unique_ptr<const Hooks> hooks) noexcept;
/**
* Produce a new object based on cmdObj, but with redactions applied as specified by
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 75b28b6595a..4347ffa8ac4 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -182,9 +182,9 @@ public:
CurOp::get(opCtx)->debug().errInfo = getStatusFromCommandResult(replyObj);
}
- void handleException(const DBException& e, OperationContext* opCtx) const override {
+ void handleException(const Status& status, OperationContext* opCtx) const override {
// If we got a stale config, wait in case the operation is stuck in a critical section
- if (auto sce = e.extraInfo<StaleConfigInfo>()) {
+ if (auto sce = status.extraInfo<StaleConfigInfo>()) {
// A config server acting as a router may return a StaleConfig exception, but a config
// server won't contain data for a sharded collection, so skip handling the exception.
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
@@ -204,7 +204,7 @@ public:
onShardVersionMismatchNoExcept(opCtx, sce->getNss(), sce->getVersionReceived())
.ignore();
}
- } else if (auto sce = e.extraInfo<StaleDbRoutingVersion>()) {
+ } else if (auto sce = status.extraInfo<StaleDbRoutingVersion>()) {
if (!opCtx->getClient()->isInDirectClient()) {
onDbVersionMismatchNoExcept(
opCtx, sce->getDb(), sce->getVersionReceived(), sce->getVersionWanted())
@@ -271,7 +271,7 @@ public:
Future<DbResponse> ServiceEntryPointMongod::handleRequest(OperationContext* opCtx,
const Message& m) noexcept {
- return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{});
+ return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique<Hooks>());
}
} // namespace mongo