diff options
-rw-r--r-- | src/mongo/base/error_codes.yml | 2 | ||||
-rw-r--r-- | src/mongo/db/commands.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/commands.h | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_util.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_util.h | 48 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 23 |
6 files changed, 151 insertions, 55 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 5d38573d087..05aaaaa15f4 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -403,6 +403,8 @@ error_codes: - {code: 328, name: SkipCommandExecution} + - {code: 329, name: FailedToRunWithReplyBuilder} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index 8d9a8de296b..0065995b640 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -184,6 +184,20 @@ void CommandHelpers::runCommandInvocation(OperationContext* opCtx, } } +Future<void> CommandHelpers::runCommandInvocationAsync( + std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<CommandInvocation> invocation) try { + auto hooks = getCommandInvocationHooksHandle(rec->getOpCtx()->getServiceContext()); + if (hooks) + hooks->onBeforeAsyncRun(rec, invocation.get()); + return invocation->runAsync(rec).then([rec, hooks, invocation] { + if (hooks) + hooks->onAfterAsyncRun(rec, invocation.get()); + }); +} catch (const DBException& e) { + return e.toStatus(); +} + void CommandHelpers::auditLogAuthEvent(OperationContext* opCtx, const CommandInvocation* invocation, const OpMsgRequest& request, @@ -787,6 +801,16 @@ private: } } + Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec) override { + return _command->runAsync(rec, _dbName).onError([rec](Status status) { + if (status.code() != ErrorCodes::FailedToRunWithReplyBuilder) + return status; + BSONObjBuilder bob = rec->getReplyBuilder()->getBodyBuilder(); + CommandHelpers::appendSimpleCommandStatus(bob, false); + return Status::OK(); + }); + } + void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) override { diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 06803dbad84..790c6123f8e 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -30,6 +30,7 @@ #pragma once #include <boost/optional.hpp> +#include <fmt/format.h> #include <functional> #include <string> #include <vector> @@ -47,10 +48,12 @@ #include "mongo/db/query/explain.h" #include "mongo/db/read_concern_support_result.h" #include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/request_execution_context.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/op_msg.h" #include "mongo/rpc/reply_builder_interface.h" #include "mongo/util/fail_point.h" +#include "mongo/util/future.h" #include "mongo/util/string_map.h" namespace mongo { @@ -92,11 +95,27 @@ public: CommandInvocation* invocation) = 0; /** + * A behavior to perform before CommandInvocation::asyncRun(). Defaults to `onBeforeRun(...)`. + */ + virtual void onBeforeAsyncRun(std::shared_ptr<RequestExecutionContext> rec, + CommandInvocation* invocation) { + onBeforeRun(rec->getOpCtx(), rec->getRequest(), invocation); + } + + /** * A behavior to perform after CommandInvocation::run() */ virtual void onAfterRun(OperationContext* opCtx, const OpMsgRequest& request, CommandInvocation* invocation) = 0; + + /** + * A behavior to perform after CommandInvocation::asyncRun(). Defaults to `onAfterRun(...)`. + */ + virtual void onAfterAsyncRun(std::shared_ptr<RequestExecutionContext> rec, + CommandInvocation* invocation) { + onAfterRun(rec->getOpCtx(), rec->getRequest(), invocation); + } }; // Various helpers unrelated to any single command or to the command registry. @@ -236,6 +255,15 @@ struct CommandHelpers { rpc::ReplyBuilderInterface* response); /** + * Runs a previously parsed command and propagates the result to the ReplyBuilderInterface. For + * commands that do not offer an implementation tailored for asynchronous execution, the future + * schedules the execution of the default implementation, historically designed for synchronous + * execution. + */ + static Future<void> runCommandInvocationAsync(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<CommandInvocation> invocation); + + /** * If '!invocation', we're logging about a Command pre-parse. It has to punt on the logged * namespace, giving only the request's $db. Since the Command hasn't parsed the request body, * we can't know the collection part of that namespace, so we leave it blank in the audit log. @@ -561,6 +589,16 @@ public: */ virtual void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) = 0; + /** + * Returns a future that can schedule asynchronous execution of the command. By default, the + * future falls back to the execution of `run(...)`, thus the default semantics of + * `runAsync(...)` is identical to that of `run(...). + */ + virtual Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec) { + run(rec->getOpCtx(), rec->getReplyBuilder()); + return Status::OK(); + } + virtual void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) { @@ -698,6 +736,18 @@ public: rpc::ReplyBuilderInterface* replyBuilder) = 0; /** + * Provides a future that may run the command asynchronously. By default, it falls back to + * runWithReplyBuilder. + */ + virtual Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec, std::string db) { + if (!runWithReplyBuilder( + rec->getOpCtx(), db, rec->getRequest().body, rec->getReplyBuilder())) + return Status(ErrorCodes::FailedToRunWithReplyBuilder, + fmt::format("Failed to run command: {}", rec->getCommand()->getName())); + return Status::OK(); + } + + /** * Commands which can be explained override this method. Any operation which has a query * part and executes as a tree of execution stages can be explained. A command should * implement explain by: diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp index 906655a12f1..d329f1f1a10 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp @@ -41,6 +41,7 @@ #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/util/assert_util.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" @@ -209,6 +210,64 @@ void onWriteToDatabase(OperationContext* opCtx, StringData dbName) { } } +class MigrationConflictHandler : public std::enable_shared_from_this<MigrationConflictHandler> { +public: + MigrationConflictHandler(std::shared_ptr<RequestExecutionContext> rec, + unique_function<Future<void>()> callable) + : _rec(std::move(rec)), _callable(std::move(callable)) {} + + Future<void> run() try { + checkIfCanReadOrBlock(_rec->getOpCtx(), _rec->getRequest().getDatabase()); + // callable will modify replyBuilder. + return _callable() + .then([this, anchor = shared_from_this()] { _checkReplyForTenantMigrationConflict(); }) + .onError<ErrorCodes::TenantMigrationConflict>( + [this, anchor = shared_from_this()](Status status) { + _handleTenantMigrationConflict(std::move(status)); + return Status::OK(); + }); + } catch (const DBException& e) { + return e.toStatus(); + } + +private: + void _checkReplyForTenantMigrationConflict() { + auto replyBodyBuilder = _rec->getReplyBuilder()->getBodyBuilder(); + + // getStatusFromWriteCommandReply expects an 'ok' field. + CommandHelpers::extractOrAppendOk(replyBodyBuilder); + + // Commands such as insert, update, delete, and applyOps return the result as a status + // rather than throwing. + const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj()); + + // Only throw `TenantMigrationConflict` exceptions. + if (status == ErrorCodes::TenantMigrationConflict) + internalAssert(status); + } + + void _handleTenantMigrationConflict(Status status) { + auto migrationConflictInfo = status.extraInfo<TenantMigrationConflictInfo>(); + invariant(migrationConflictInfo); + + auto& mtabByPrefix = + TenantMigrationAccessBlockerByPrefix::get(_rec->getOpCtx()->getServiceContext()); + if (auto mtab = mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix( + migrationConflictInfo->getDatabasePrefix())) { + _rec->getReplyBuilder()->getBodyBuilder().resetToEmpty(); + mtab->checkIfCanWriteOrBlock(_rec->getOpCtx()); + } + } + + const std::shared_ptr<RequestExecutionContext> _rec; + const unique_function<Future<void>()> _callable; +}; + +Future<void> migrationConflictHandler(std::shared_ptr<RequestExecutionContext> rec, + unique_function<Future<void>()> callable) { + return std::make_shared<MigrationConflictHandler>(std::move(rec), std::move(callable))->run(); +} + } // namespace tenant_migration_donor } // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h index 6c47c594621..2334f3fda77 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.h +++ b/src/mongo/db/repl/tenant_migration_donor_util.h @@ -35,9 +35,12 @@ #include "mongo/db/repl/tenant_migration_access_blocker_by_prefix.h" #include "mongo/db/repl/tenant_migration_conflict_info.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/request_execution_context.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/reply_builder_interface.h" +#include "mongo/util/functional.h" +#include "mongo/util/future.h" namespace mongo { @@ -73,46 +76,13 @@ void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx, StringDat void onWriteToDatabase(OperationContext* opCtx, StringData dbName); /** - * Runs the argument function 'callable'. If it throws a TenantMigrationConflict error (as indicated - * in 'replyBuilder'), clears 'replyBuilder' and blocks until the migration commits or aborts, then - * throws TenantMigrationCommitted or TenantMigrationAborted. + * Returns a future that asynchronously schedules and runs the argument function 'callable'. If it + * throws a TenantMigrationConflict error (as indicated in 'replyBuilder'), clears 'replyBuilder' + * and blocks until the migration commits or aborts, then returns TenantMigrationCommitted or + * TenantMigrationAborted. */ -template <typename Callable> -void migrationConflictHandler(OperationContext* opCtx, - StringData dbName, - Callable&& callable, - rpc::ReplyBuilderInterface* replyBuilder) { - checkIfCanReadOrBlock(opCtx, dbName); - - auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()); - - try { - // callable will modify replyBuilder. - callable(); - auto replyBodyBuilder = replyBuilder->getBodyBuilder(); - - // getStatusFromWriteCommandReply expects an 'ok' field. - CommandHelpers::extractOrAppendOk(replyBodyBuilder); - - // Commands such as insert, update, delete, and applyOps return the result as a status - // rather than throwing. - const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj()); - - if (status == ErrorCodes::TenantMigrationConflict) { - uassertStatusOK(status); - } - return; - } catch (const TenantMigrationConflictException& ex) { - auto migrationConflictInfo = ex.extraInfo<TenantMigrationConflictInfo>(); - invariant(migrationConflictInfo); - - if (auto mtab = mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix( - migrationConflictInfo->getDatabasePrefix())) { - replyBuilder->getBodyBuilder().resetToEmpty(); - mtab->checkIfCanWriteOrBlock(opCtx); - } - } -} +Future<void> migrationConflictHandler(std::shared_ptr<RequestExecutionContext> rec, + unique_function<Future<void>()> callable); } // namespace tenant_migration_donor diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 6b17d5fff44..2b20ed67bab 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -735,7 +735,7 @@ public: private: class SessionCheckoutPath; - Future<void> _runInvocation(); + Future<void> _runInvocation() noexcept; const std::shared_ptr<ExecCommandDatabase> _ecd; }; @@ -911,22 +911,13 @@ Future<void> InvokeCommand::SessionCheckoutPath::_checkOutSession() { return Status::OK(); } -Future<void> InvokeCommand::_runInvocation() try { +Future<void> InvokeCommand::_runInvocation() noexcept { auto execContext = _ecd->getExecutionContext(); - OperationContext* opCtx = execContext->getOpCtx(); - const OpMsgRequest& request = execContext->getRequest(); - CommandInvocation* invocation = _ecd->getInvocation().get(); - rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder(); - - tenant_migration_donor::migrationConflictHandler( - opCtx, - request.getDatabase(), - [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); }, - replyBuilder); - - return Status::OK(); -} catch (const DBException& ex) { - return ex.toStatus(); + return tenant_migration_donor::migrationConflictHandler( + execContext, [execContext, invocation = _ecd->getInvocation()] { + return CommandHelpers::runCommandInvocationAsync(std::move(execContext), + std::move(invocation)); + }); } void InvokeCommand::SessionCheckoutPath::_tapError(Status status) { |