summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-10-14 21:34:40 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-10-19 22:44:45 +0000
commit18f1d000290ef77758455bbbe9c5de174614c3e7 (patch)
tree9a7fc13ed440000744aaa8af11c34a0d6d14e50d
parentfa29e47f37da2353b49ee71c907026b769fdc607 (diff)
downloadmongo-18f1d000290ef77758455bbbe9c5de174614c3e7.tar.gz
SERVER-49107 Futurize migrationConflictHandler and runCommandInvocation
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/db/commands.cpp24
-rw-r--r--src/mongo/db/commands.h50
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.cpp59
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.h48
-rw-r--r--src/mongo/db/service_entry_point_common.cpp23
6 files changed, 151 insertions, 55 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 5d38573d087..05aaaaa15f4 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -403,6 +403,8 @@ error_codes:
- {code: 328, name: SkipCommandExecution}
+ - {code: 329, name: FailedToRunWithReplyBuilder}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index 8d9a8de296b..0065995b640 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -184,6 +184,20 @@ void CommandHelpers::runCommandInvocation(OperationContext* opCtx,
}
}
+Future<void> CommandHelpers::runCommandInvocationAsync(
+ std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<CommandInvocation> invocation) try {
+ auto hooks = getCommandInvocationHooksHandle(rec->getOpCtx()->getServiceContext());
+ if (hooks)
+ hooks->onBeforeAsyncRun(rec, invocation.get());
+ return invocation->runAsync(rec).then([rec, hooks, invocation] {
+ if (hooks)
+ hooks->onAfterAsyncRun(rec, invocation.get());
+ });
+} catch (const DBException& e) {
+ return e.toStatus();
+}
+
void CommandHelpers::auditLogAuthEvent(OperationContext* opCtx,
const CommandInvocation* invocation,
const OpMsgRequest& request,
@@ -787,6 +801,16 @@ private:
}
}
+ Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec) override {
+ return _command->runAsync(rec, _dbName).onError([rec](Status status) {
+ if (status.code() != ErrorCodes::FailedToRunWithReplyBuilder)
+ return status;
+ BSONObjBuilder bob = rec->getReplyBuilder()->getBodyBuilder();
+ CommandHelpers::appendSimpleCommandStatus(bob, false);
+ return Status::OK();
+ });
+ }
+
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) override {
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index 06803dbad84..790c6123f8e 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -30,6 +30,7 @@
#pragma once
#include <boost/optional.hpp>
+#include <fmt/format.h>
#include <functional>
#include <string>
#include <vector>
@@ -47,10 +48,12 @@
#include "mongo/db/query/explain.h"
#include "mongo/db/read_concern_support_result.h"
#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/request_execution_context.h"
#include "mongo/db/write_concern.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/reply_builder_interface.h"
#include "mongo/util/fail_point.h"
+#include "mongo/util/future.h"
#include "mongo/util/string_map.h"
namespace mongo {
@@ -92,11 +95,27 @@ public:
CommandInvocation* invocation) = 0;
/**
+ * A behavior to perform before CommandInvocation::asyncRun(). Defaults to `onBeforeRun(...)`.
+ */
+ virtual void onBeforeAsyncRun(std::shared_ptr<RequestExecutionContext> rec,
+ CommandInvocation* invocation) {
+ onBeforeRun(rec->getOpCtx(), rec->getRequest(), invocation);
+ }
+
+ /**
* A behavior to perform after CommandInvocation::run()
*/
virtual void onAfterRun(OperationContext* opCtx,
const OpMsgRequest& request,
CommandInvocation* invocation) = 0;
+
+ /**
+ * A behavior to perform after CommandInvocation::asyncRun(). Defaults to `onAfterRun(...)`.
+ */
+ virtual void onAfterAsyncRun(std::shared_ptr<RequestExecutionContext> rec,
+ CommandInvocation* invocation) {
+ onAfterRun(rec->getOpCtx(), rec->getRequest(), invocation);
+ }
};
// Various helpers unrelated to any single command or to the command registry.
@@ -236,6 +255,15 @@ struct CommandHelpers {
rpc::ReplyBuilderInterface* response);
/**
+ * Runs a previously parsed command and propagates the result to the ReplyBuilderInterface. For
+ * commands that do not offer an implementation tailored for asynchronous execution, the future
+ * schedules the execution of the default implementation, historically designed for synchronous
+ * execution.
+ */
+ static Future<void> runCommandInvocationAsync(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<CommandInvocation> invocation);
+
+ /**
* If '!invocation', we're logging about a Command pre-parse. It has to punt on the logged
* namespace, giving only the request's $db. Since the Command hasn't parsed the request body,
* we can't know the collection part of that namespace, so we leave it blank in the audit log.
@@ -561,6 +589,16 @@ public:
*/
virtual void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) = 0;
+ /**
+ * Returns a future that can schedule asynchronous execution of the command. By default, the
+ * future falls back to the execution of `run(...)`, thus the default semantics of
+ * `runAsync(...)` is identical to that of `run(...).
+ */
+ virtual Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec) {
+ run(rec->getOpCtx(), rec->getReplyBuilder());
+ return Status::OK();
+ }
+
virtual void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) {
@@ -698,6 +736,18 @@ public:
rpc::ReplyBuilderInterface* replyBuilder) = 0;
/**
+ * Provides a future that may run the command asynchronously. By default, it falls back to
+ * runWithReplyBuilder.
+ */
+ virtual Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec, std::string db) {
+ if (!runWithReplyBuilder(
+ rec->getOpCtx(), db, rec->getRequest().body, rec->getReplyBuilder()))
+ return Status(ErrorCodes::FailedToRunWithReplyBuilder,
+ fmt::format("Failed to run command: {}", rec->getCommand()->getName()));
+ return Status::OK();
+ }
+
+ /**
* Commands which can be explained override this method. Any operation which has a query
* part and executes as a tree of execution stages can be explained. A command should
* implement explain by:
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp
index 906655a12f1..d329f1f1a10 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp
@@ -41,6 +41,7 @@
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
@@ -209,6 +210,64 @@ void onWriteToDatabase(OperationContext* opCtx, StringData dbName) {
}
}
+class MigrationConflictHandler : public std::enable_shared_from_this<MigrationConflictHandler> {
+public:
+ MigrationConflictHandler(std::shared_ptr<RequestExecutionContext> rec,
+ unique_function<Future<void>()> callable)
+ : _rec(std::move(rec)), _callable(std::move(callable)) {}
+
+ Future<void> run() try {
+ checkIfCanReadOrBlock(_rec->getOpCtx(), _rec->getRequest().getDatabase());
+ // callable will modify replyBuilder.
+ return _callable()
+ .then([this, anchor = shared_from_this()] { _checkReplyForTenantMigrationConflict(); })
+ .onError<ErrorCodes::TenantMigrationConflict>(
+ [this, anchor = shared_from_this()](Status status) {
+ _handleTenantMigrationConflict(std::move(status));
+ return Status::OK();
+ });
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+
+private:
+ void _checkReplyForTenantMigrationConflict() {
+ auto replyBodyBuilder = _rec->getReplyBuilder()->getBodyBuilder();
+
+ // getStatusFromWriteCommandReply expects an 'ok' field.
+ CommandHelpers::extractOrAppendOk(replyBodyBuilder);
+
+ // Commands such as insert, update, delete, and applyOps return the result as a status
+ // rather than throwing.
+ const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj());
+
+ // Only throw `TenantMigrationConflict` exceptions.
+ if (status == ErrorCodes::TenantMigrationConflict)
+ internalAssert(status);
+ }
+
+ void _handleTenantMigrationConflict(Status status) {
+ auto migrationConflictInfo = status.extraInfo<TenantMigrationConflictInfo>();
+ invariant(migrationConflictInfo);
+
+ auto& mtabByPrefix =
+ TenantMigrationAccessBlockerByPrefix::get(_rec->getOpCtx()->getServiceContext());
+ if (auto mtab = mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix(
+ migrationConflictInfo->getDatabasePrefix())) {
+ _rec->getReplyBuilder()->getBodyBuilder().resetToEmpty();
+ mtab->checkIfCanWriteOrBlock(_rec->getOpCtx());
+ }
+ }
+
+ const std::shared_ptr<RequestExecutionContext> _rec;
+ const unique_function<Future<void>()> _callable;
+};
+
+Future<void> migrationConflictHandler(std::shared_ptr<RequestExecutionContext> rec,
+ unique_function<Future<void>()> callable) {
+ return std::make_shared<MigrationConflictHandler>(std::move(rec), std::move(callable))->run();
+}
+
} // namespace tenant_migration_donor
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h
index 6c47c594621..2334f3fda77 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.h
+++ b/src/mongo/db/repl/tenant_migration_donor_util.h
@@ -35,9 +35,12 @@
#include "mongo/db/repl/tenant_migration_access_blocker_by_prefix.h"
#include "mongo/db/repl/tenant_migration_conflict_info.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/request_execution_context.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/reply_builder_interface.h"
+#include "mongo/util/functional.h"
+#include "mongo/util/future.h"
namespace mongo {
@@ -73,46 +76,13 @@ void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx, StringDat
void onWriteToDatabase(OperationContext* opCtx, StringData dbName);
/**
- * Runs the argument function 'callable'. If it throws a TenantMigrationConflict error (as indicated
- * in 'replyBuilder'), clears 'replyBuilder' and blocks until the migration commits or aborts, then
- * throws TenantMigrationCommitted or TenantMigrationAborted.
+ * Returns a future that asynchronously schedules and runs the argument function 'callable'. If it
+ * throws a TenantMigrationConflict error (as indicated in 'replyBuilder'), clears 'replyBuilder'
+ * and blocks until the migration commits or aborts, then returns TenantMigrationCommitted or
+ * TenantMigrationAborted.
*/
-template <typename Callable>
-void migrationConflictHandler(OperationContext* opCtx,
- StringData dbName,
- Callable&& callable,
- rpc::ReplyBuilderInterface* replyBuilder) {
- checkIfCanReadOrBlock(opCtx, dbName);
-
- auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext());
-
- try {
- // callable will modify replyBuilder.
- callable();
- auto replyBodyBuilder = replyBuilder->getBodyBuilder();
-
- // getStatusFromWriteCommandReply expects an 'ok' field.
- CommandHelpers::extractOrAppendOk(replyBodyBuilder);
-
- // Commands such as insert, update, delete, and applyOps return the result as a status
- // rather than throwing.
- const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj());
-
- if (status == ErrorCodes::TenantMigrationConflict) {
- uassertStatusOK(status);
- }
- return;
- } catch (const TenantMigrationConflictException& ex) {
- auto migrationConflictInfo = ex.extraInfo<TenantMigrationConflictInfo>();
- invariant(migrationConflictInfo);
-
- if (auto mtab = mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix(
- migrationConflictInfo->getDatabasePrefix())) {
- replyBuilder->getBodyBuilder().resetToEmpty();
- mtab->checkIfCanWriteOrBlock(opCtx);
- }
- }
-}
+Future<void> migrationConflictHandler(std::shared_ptr<RequestExecutionContext> rec,
+ unique_function<Future<void>()> callable);
} // namespace tenant_migration_donor
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 6b17d5fff44..2b20ed67bab 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -735,7 +735,7 @@ public:
private:
class SessionCheckoutPath;
- Future<void> _runInvocation();
+ Future<void> _runInvocation() noexcept;
const std::shared_ptr<ExecCommandDatabase> _ecd;
};
@@ -911,22 +911,13 @@ Future<void> InvokeCommand::SessionCheckoutPath::_checkOutSession() {
return Status::OK();
}
-Future<void> InvokeCommand::_runInvocation() try {
+Future<void> InvokeCommand::_runInvocation() noexcept {
auto execContext = _ecd->getExecutionContext();
- OperationContext* opCtx = execContext->getOpCtx();
- const OpMsgRequest& request = execContext->getRequest();
- CommandInvocation* invocation = _ecd->getInvocation().get();
- rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder();
-
- tenant_migration_donor::migrationConflictHandler(
- opCtx,
- request.getDatabase(),
- [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
- replyBuilder);
-
- return Status::OK();
-} catch (const DBException& ex) {
- return ex.toStatus();
+ return tenant_migration_donor::migrationConflictHandler(
+ execContext, [execContext, invocation = _ecd->getInvocation()] {
+ return CommandHelpers::runCommandInvocationAsync(std::move(execContext),
+ std::move(invocation));
+ });
}
void InvokeCommand::SessionCheckoutPath::_tapError(Status status) {