diff options
-rw-r--r-- | src/mongo/db/client_strand.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 18 | ||||
-rw-r--r-- | src/mongo/db/commands/async_command_execution_test.cpp | 152 | ||||
-rw-r--r-- | src/mongo/db/commands/dbcommands.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 2 | ||||
-rw-r--r-- | src/mongo/executor/SConscript | 13 | ||||
-rw-r--r-- | src/mongo/executor/async_request_executor.cpp | 88 | ||||
-rw-r--r-- | src/mongo/executor/async_request_executor.h | 73 | ||||
-rw-r--r-- | src/mongo/s/commands/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_build_info.cpp | 27 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 37 |
11 files changed, 414 insertions, 27 deletions
diff --git a/src/mongo/db/client_strand.cpp b/src/mongo/db/client_strand.cpp index 470ae6d3133..8a70420f677 100644 --- a/src/mongo/db/client_strand.cpp +++ b/src/mongo/db/client_strand.cpp @@ -71,7 +71,7 @@ void ClientStrand::_setCurrent() noexcept { if (oldThreadName != threadName) { _oldThreadName = oldThreadName.toString(); setThreadName(threadName); - LOGV2_DEBUG(4910701, kDiagnosticLogLevel, "Set thread name", "name"_attr = threadName); + LOGV2_DEBUG(4910703, kDiagnosticLogLevel, "Set thread name", "name"_attr = threadName); } } diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 5e12faebf7b..acf5ea2e5fc 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -324,6 +324,7 @@ env.Library( "$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl", '$BUILD_DIR/mongo/db/transaction', '$BUILD_DIR/mongo/db/views/views_mongod', + '$BUILD_DIR/mongo/executor/async_request_executor', '$BUILD_DIR/mongo/util/log_and_backoff', '$BUILD_DIR/mongo/util/net/http_client', 'core', @@ -630,6 +631,23 @@ env.CppUnitTest( ) env.CppUnitTest( + target="async_command_execution_test", + source=[ + "async_command_execution_test.cpp", + ], + LIBDEPS=[ + "$BUILD_DIR/mongo/base", + "$BUILD_DIR/mongo/db/auth/authmocks", + "$BUILD_DIR/mongo/db/auth/authorization_manager_global", + "$BUILD_DIR/mongo/db/commands", + "$BUILD_DIR/mongo/db/commands/standalone", + "$BUILD_DIR/mongo/db/service_context_test_fixture", + "$BUILD_DIR/mongo/unittest/unittest", + "$BUILD_DIR/mongo/util/version_impl", + ], +) + +env.CppUnitTest( target="db_commands_test", source=[ "index_filter_commands_test.cpp", diff --git a/src/mongo/db/commands/async_command_execution_test.cpp b/src/mongo/db/commands/async_command_execution_test.cpp new file mode 100644 index 00000000000..a5f1e38aeb6 --- /dev/null +++ b/src/mongo/db/commands/async_command_execution_test.cpp @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include <fmt/format.h> + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/client.h" +#include "mongo/db/client_strand.h" +#include "mongo/db/commands.h" +#include "mongo/db/request_execution_context.h" +#include "mongo/db/service_context_test_fixture.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/factory.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/fail_point.h" + +namespace mongo { +namespace { + +using namespace fmt::literals; + +class AsyncCommandExecutionTest : public unittest::Test, public ScopedGlobalServiceContextForTest { +public: + void runTestForCommand(StringData command) { + BSONObj syncResponse, asyncResponse; + + auto client = getServiceContext()->makeClient("Client"); + auto strand = ClientStrand::make(std::move(client)); + + { + auto ctx = makeExecutionContext(strand, command); + strand->run([&] { syncResponse = getSyncResponse(ctx); }); + } + + { + auto ctx = makeExecutionContext(strand, command); + asyncResponse = getAsyncResponse(strand, ctx); + } + + { + auto ctx = makeExecutionContext(strand, command); + killAsyncCommand(strand, ctx); + } + + ASSERT_BSONOBJ_EQ(syncResponse, asyncResponse); + } + +private: + struct ExecutionContext { + ServiceContext::UniqueOperationContext opCtx; + std::shared_ptr<RequestExecutionContext> rec; + std::shared_ptr<CommandInvocation> invocation; + }; + + ExecutionContext makeExecutionContext(ClientStrandPtr strand, StringData commandName) const { + auto guard = strand->bind(); + ExecutionContext ctx; + ctx.opCtx = cc().makeOperationContext(); + + auto rec = + std::make_shared<RequestExecutionContext>(ctx.opCtx.get(), mockMessage(commandName)); + rec->setReplyBuilder(makeReplyBuilder(rpc::protocolForMessage(rec->getMessage()))); + rec->setRequest(rpc::opMsgRequestFromAnyProtocol(rec->getMessage())); + rec->setCommand(CommandHelpers::findCommand(rec->getRequest().getCommandName())); + + auto cmd = rec->getCommand(); + invariant(cmd); + ctx.invocation = cmd->parse(ctx.opCtx.get(), rec->getRequest()); + ctx.rec = std::move(rec); + return ctx; + } + + BSONObj getSyncResponse(ExecutionContext& ctx) const { + ctx.invocation->run(ctx.rec->getOpCtx(), ctx.rec->getReplyBuilder()); + return ctx.rec->getReplyBuilder()->getBodyBuilder().done().getOwned(); + } + + BSONObj getAsyncResponse(ClientStrandPtr strand, ExecutionContext& ctx) const { + Future<void> future; + { + auto guard = strand->bind(); + FailPointEnableBlock fp("hangBeforeRunningAsyncRequestExecutorTask"); + future = ctx.invocation->runAsync(ctx.rec); + ASSERT(!future.isReady()); + } + + ASSERT(future.getNoThrow().isOK()); + + return [&] { + auto guard = strand->bind(); + return ctx.rec->getReplyBuilder()->getBodyBuilder().done().getOwned(); + }(); + } + + void killAsyncCommand(ClientStrandPtr strand, ExecutionContext& ctx) const { + Future<void> future; + { + auto guard = strand->bind(); + FailPointEnableBlock fp("hangBeforeRunningAsyncRequestExecutorTask"); + future = ctx.invocation->runAsync(ctx.rec); + + auto opCtx = ctx.rec->getOpCtx(); + stdx::lock_guard<Client> lk(*opCtx->getClient()); + opCtx->getServiceContext()->killOperation(lk, opCtx, ErrorCodes::Interrupted); + } + + ASSERT_EQ(future.getNoThrow().code(), ErrorCodes::Interrupted); + } + + Message mockMessage(StringData commandName) const { + OpMsgBuilder builder; + builder.setBody(BSON(commandName << 1 << "$db" + << "test")); + return builder.finish(); + } +}; + +TEST_F(AsyncCommandExecutionTest, BuildInfo) { + runTestForCommand("buildinfo"); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index 06634e1e68b..822443eb434 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -83,13 +83,16 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/request_execution_context.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/stats/storage_stats.h" #include "mongo/db/storage/storage_engine_init.h" #include "mongo/db/write_concern.h" +#include "mongo/executor/async_request_executor.h" #include "mongo/logv2/log.h" #include "mongo/scripting/engine.h" #include "mongo/util/fail_point.h" +#include "mongo/util/future.h" #include "mongo/util/md5.hpp" #include "mongo/util/scopeguard.h" #include "mongo/util/version.h" @@ -781,6 +784,26 @@ public: } cmdDBStats; +// Provides the means to asynchronously run `buildinfo` commands. +class BuildInfoExecutor final : public AsyncRequestExecutor { +public: + BuildInfoExecutor() : AsyncRequestExecutor("BuildInfoExecutor") {} + + Status handleRequest(std::shared_ptr<RequestExecutionContext> rec) { + auto result = rec->getReplyBuilder()->getBodyBuilder(); + VersionInfoInterface::instance().appendBuildInfo(&result); + appendStorageEngineList(rec->getOpCtx()->getServiceContext(), &result); + return Status::OK(); + } + + static BuildInfoExecutor* get(ServiceContext* svc); +}; + +const auto getBuildInfoExecutor = ServiceContext::declareDecoration<BuildInfoExecutor>(); +BuildInfoExecutor* BuildInfoExecutor::get(ServiceContext* svc) { + return const_cast<BuildInfoExecutor*>(&getBuildInfoExecutor(svc)); +} + class CmdBuildInfo : public BasicCommand { public: CmdBuildInfo() : BasicCommand("buildInfo", "buildinfo") {} @@ -816,6 +839,11 @@ public: return true; } + Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec, std::string) override { + auto opCtx = rec->getOpCtx(); + return BuildInfoExecutor::get(opCtx->getServiceContext())->schedule(std::move(rec)); + } + } cmdBuildInfo; } // namespace diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index c6e26cfc796..1f2846e4875 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -602,7 +602,7 @@ private: return _initiateCommand() .then([this] { return _commandExec(); }) .onError([this, anchor = shared_from_this()](Status status) { - return _handleFailure(std::move(status)); + _handleFailure(std::move(status)); }); }); } diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index b10cf01369a..444b0d34786 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -270,6 +270,19 @@ env.Library( ], ) +env.Library( + target='async_request_executor', + source=[ + 'async_request_executor.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', + '$BUILD_DIR/mongo/util/fail_point', + ], +) + env.CppUnitTest( target='executor_test', source=[ diff --git a/src/mongo/executor/async_request_executor.cpp b/src/mongo/executor/async_request_executor.cpp new file mode 100644 index 00000000000..cf535e864f6 --- /dev/null +++ b/src/mongo/executor/async_request_executor.cpp @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor + +#include "mongo/executor/async_request_executor.h" + +#include "mongo/db/client_strand.h" +#include "mongo/db/operation_context.h" +#include "mongo/logv2/log.h" +#include "mongo/platform/compiler.h" +#include "mongo/util/fail_point.h" + +namespace mongo { + +MONGO_FAIL_POINT_DEFINE(hangBeforeRunningAsyncRequestExecutorTask); + +AsyncRequestExecutor::AsyncRequestExecutor(std::string name) : _name(std::move(name)) { + ThreadPool::Options options; + options.minThreads = 0; + options.maxThreads = 1; + _pool = std::make_unique<ThreadPool>(std::move(options)); + _pool->startup(); + LOGV2_DEBUG( + 4910801, kDiagnosticLogLevel, "Started asynchronous request executor", "name"_attr = _name); +} + +AsyncRequestExecutor::~AsyncRequestExecutor() { + _pool->shutdown(); + _pool->join(); + LOGV2_DEBUG( + 4910802, kDiagnosticLogLevel, "Stopped asynchronous request executor", "name"_attr = _name); +} + +Future<void> AsyncRequestExecutor::schedule(std::shared_ptr<RequestExecutionContext> rec) { + auto opCtx = rec->getOpCtx(); + auto [promise, future] = makePromiseFuture<void>(); + + // `this` remains valid as it owns the instance of thread pool. + _pool->schedule([this, + strand = ClientStrand::get(opCtx->getClient()), + promise = std::move(promise), + rec = std::move(rec)](Status status) mutable { + hangBeforeRunningAsyncRequestExecutorTask.pauseWhileSet(); + strand->run([&] { + promise.setWith([&] { + if (MONGO_unlikely(!status.isOK())) + return status.withContext("Unable to schedule asynchronous request"); + + auto opCtx = rec->getOpCtx(); + if (opCtx->isKillPending()) + return Status(opCtx->getKillStatus(), "Asynchronous operation was interrupted"); + + return handleRequest(std::move(rec)); + }); + }); + }); + + return std::move(future); +} + +} // namespace mongo diff --git a/src/mongo/executor/async_request_executor.h b/src/mongo/executor/async_request_executor.h new file mode 100644 index 00000000000..a54e7848392 --- /dev/null +++ b/src/mongo/executor/async_request_executor.h @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <string> + +#include "mongo/db/request_execution_context.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/future.h" + +namespace mongo { + +/** + * The base class for constructing command-specific asynchronous executors. + * Requests (i.e., instances of `RequestExecutionContext`) are scheduled on a thread-pool, and + * passed to the command-specific implementation of `handleRequest`. + */ +class AsyncRequestExecutor { +public: + AsyncRequestExecutor(AsyncRequestExecutor&&) = delete; + AsyncRequestExecutor(const AsyncRequestExecutor&) = delete; + + explicit AsyncRequestExecutor(std::string name); + ~AsyncRequestExecutor(); + + /** + * Runs the command-specific code to handle the request. + * Must only access the request on the corresponding client thread. + */ + virtual Status handleRequest(std::shared_ptr<RequestExecutionContext>) = 0; + + /** + * Schedules the request on a thread pool (i.e., `_pool`) and calls into `handleRequest` to + * asynchronously execute the command. + */ + Future<void> schedule(std::shared_ptr<RequestExecutionContext> rec); + + static constexpr auto kDiagnosticLogLevel = 4; + +private: + const std::string _name; + std::unique_ptr<ThreadPool> _pool; +}; + +} // namespace mongo diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 8fc761a2e0b..479f16ad6cb 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -103,6 +103,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands/servers', + '$BUILD_DIR/mongo/executor/async_request_executor', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/audit', diff --git a/src/mongo/s/commands/cluster_build_info.cpp b/src/mongo/s/commands/cluster_build_info.cpp index 23025e47fcc..f5878c1bed6 100644 --- a/src/mongo/s/commands/cluster_build_info.cpp +++ b/src/mongo/s/commands/cluster_build_info.cpp @@ -32,11 +32,33 @@ #include "mongo/platform/basic.h" #include "mongo/db/commands.h" +#include "mongo/db/request_execution_context.h" +#include "mongo/executor/async_request_executor.h" +#include "mongo/util/future.h" #include "mongo/util/version.h" namespace mongo { namespace { +class ClusterBuildInfoExecutor final : public AsyncRequestExecutor { +public: + ClusterBuildInfoExecutor() : AsyncRequestExecutor("ClusterBuildInfoExecutor") {} + + Status handleRequest(std::shared_ptr<RequestExecutionContext> rec) { + auto result = rec->getReplyBuilder()->getBodyBuilder(); + VersionInfoInterface::instance().appendBuildInfo(&result); + return Status::OK(); + } + + static ClusterBuildInfoExecutor* get(ServiceContext* svc); +}; + +const auto getClusterBuildInfoExecutor = + ServiceContext::declareDecoration<ClusterBuildInfoExecutor>(); +ClusterBuildInfoExecutor* ClusterBuildInfoExecutor::get(ServiceContext* svc) { + return const_cast<ClusterBuildInfoExecutor*>(&getClusterBuildInfoExecutor(svc)); +} + class ClusterCmdBuildInfo : public BasicCommand { public: ClusterCmdBuildInfo() : BasicCommand("buildInfo", "buildinfo") {} @@ -70,6 +92,11 @@ public: return true; } + Future<void> runAsync(std::shared_ptr<RequestExecutionContext> rec, std::string) override { + auto opCtx = rec->getOpCtx(); + return ClusterBuildInfoExecutor::get(opCtx->getServiceContext())->schedule(std::move(rec)); + } + } cmdBuildInfo; } // namespace diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 9a773cd2a48..da3ddab2c8d 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -305,12 +305,6 @@ private: boost::optional<MessageCompressorId> _compressorId; Message _inMessage; Message _outMessage; - - // Owns the instance of OperationContext that is used to process ingress requests (i.e., - // `handleRequest`). It also allows delegating destruction of opCtx to another function to - // potentially remove its cost from the critical path. This is currently only used in - // `processMessage()`. - ServiceContext::UniqueOperationContext _opCtx; }; Future<void> ServiceStateMachine::Impl::sourceMessage() { @@ -448,20 +442,24 @@ Future<void> ServiceStateMachine::Impl::processMessage() { networkCounter.hitLogicalIn(_inMessage.size()); // Pass sourced Message to handler to generate response. - invariant(!_opCtx); - _opCtx = Client::getCurrent()->makeOperationContext(); + auto opCtx = Client::getCurrent()->makeOperationContext(); if (_inExhaust) { - _opCtx->markKillOnClientDisconnect(); + opCtx->markKillOnClientDisconnect(); } // The handleRequest is implemented in a subclass for mongod/mongos and actually all the // database work for this request. - return _sep->handleRequest(_opCtx.get(), _inMessage) - .then([this, &compressorMgr = compressorMgr](DbResponse dbresponse) mutable -> void { + return _sep->handleRequest(opCtx.get(), _inMessage) + .then([this, &compressorMgr = compressorMgr, opCtx = std::move(opCtx)]( + DbResponse dbresponse) mutable -> void { // opCtx must be killed and delisted here so that the operation cannot show up in - // currentOp results after the response reaches the client. The destruction is postponed - // for later to mitigate its performance impact on the critical path of execution. - _serviceContext->killAndDelistOperation(_opCtx.get(), + // currentOp results after the response reaches the client. Destruction of the already + // killed opCtx is postponed for later (i.e., after completion of the future-chain) to + // mitigate its performance impact on the critical path of execution. + // Note that destroying futures after execution, rather that postponing the destruction + // until completion of the future-chain, would expose the cost of destroying opCtx to + // the critical path and result in serious performance implications. + _serviceContext->killAndDelistOperation(opCtx.get(), ErrorCodes::OperationIsKilledAndDelisted); // Format our response, if we have one @@ -552,11 +550,6 @@ void ServiceStateMachine::Impl::runOnce() { return sinkMessage(); }) .getAsync([this](Status status) { - // Destroy the opCtx (already killed) here, to potentially use the delay between - // clients' requests to hide the destruction cost. - if (MONGO_likely(_opCtx)) { - _opCtx.reset(); - } if (!status.isOK()) { _state.store(State::EndSession); // The service executor failed to schedule the task. This could for example be that @@ -660,12 +653,6 @@ void ServiceStateMachine::Impl::setCleanupHook(std::function<void()> hook) { void ServiceStateMachine::Impl::cleanupSession(const Status& status) { LOGV2_INFO(5127900, "Ending session", "error"_attr = status); - // Ensure the delayed destruction of opCtx always happens before doing the cleanup. - if (MONGO_likely(_opCtx)) { - _opCtx.reset(); - } - invariant(!_opCtx); - cleanupExhaustResources(); { |