diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-09-17 17:36:35 +0000 |
---|---|---|
committer | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-09-17 17:36:35 +0000 |
commit | 66b412e1658a3db4935385147b0048a37b39193e (patch) | |
tree | 974aa4a1f9e23d441ce2c7aa0b20b4ff9236a033 | |
parent | 5f3f8d97455eb1a599bf29d3ef2c981c3be0f265 (diff) | |
download | mongo-66b412e1658a3db4935385147b0048a37b39193e.tar.gz |
SERVER-49107 Add support for async execution to handleRequest
This patch extends handleRequest to capture context for out-of-line
execution and makes ServiceStateMachine own the opCtx used for
command execution.
-rw-r--r-- | src/mongo/db/request_execution_context.h | 131 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 299 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.h | 2 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_mongod.cpp | 2 | ||||
-rw-r--r-- | src/mongo/embedded/service_entry_point_embedded.cpp | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 32 |
6 files changed, 334 insertions, 134 deletions
diff --git a/src/mongo/db/request_execution_context.h b/src/mongo/db/request_execution_context.h new file mode 100644 index 00000000000..b34c7e4f3b7 --- /dev/null +++ b/src/mongo/db/request_execution_context.h @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/db/dbmessage.h" +#include "mongo/db/operation_context.h" +#include "mongo/rpc/message.h" +#include "mongo/rpc/op_msg.h" +#include "mongo/rpc/reply_builder_interface.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +class Command; + +/** + * Captures the execution context for a command to allow safe, shared and asynchronous accesses. + * This class owns all objects that participate in command execution (e.g., `request`). The only + * exceptions are `opCtx` and `command`. The `opCtx` remains valid so long as its corresponding + * client is attached to the executor thread. In case of `command`, it is a global, static + * construct and is safe to be accessed through raw pointers. + * Any access from a client thread that does not own the `opCtx`, or after the `opCtx` is + * released is strictly forbidden. + */ +class RequestExecutionContext { +public: + RequestExecutionContext() = delete; + RequestExecutionContext(const RequestExecutionContext&) = delete; + RequestExecutionContext(RequestExecutionContext&&) = delete; + + explicit RequestExecutionContext(OperationContext* opCtx) : _opCtx(opCtx) {} + + auto getOpCtx() const { + invariant(_isOnClientThread()); + return _opCtx; + } + + void setMessage(Message message) { + invariant(_isOnClientThread() && !_message); + _message = std::move(message); + _dbmsg = std::make_unique<DbMessage>(_message.get()); + } + const Message& getMessage() const { + invariant(_isOnClientThread() && _message); + return _message.get(); + } + DbMessage& getDbMessage() const { + invariant(_isOnClientThread() && _dbmsg); + return *_dbmsg.get(); + } + + void setRequest(OpMsgRequest request) { + invariant(_isOnClientThread() && !_request); + _request = std::move(request); + } + const OpMsgRequest& getRequest() const { + invariant(_isOnClientThread() && _request); + return _request.get(); + } + + void setCommand(Command* command) { + invariant(_isOnClientThread() && !_command); + _command = command; + } + Command* getCommand() const { + invariant(_isOnClientThread()); + return _command; + } + + void setReplyBuilder(std::unique_ptr<rpc::ReplyBuilderInterface> replyBuilder) { + invariant(_isOnClientThread() && !_replyBuilder); + _replyBuilder = std::move(replyBuilder); + } + auto getReplyBuilder() const { + invariant(_isOnClientThread() && _replyBuilder); + return _replyBuilder.get(); + } + + void setResponse(DbResponse response) { + invariant(_isOnClientThread()); + _response = std::move(response); + } + DbResponse& getResponse() { + invariant(_isOnClientThread()); + return _response; + } + +private: + bool _isOnClientThread() const { + return _opCtx != nullptr && Client::getCurrent() == _opCtx->getClient(); + } + + OperationContext* const _opCtx; + boost::optional<Message> _message; + std::unique_ptr<DbMessage> _dbmsg; + boost::optional<OpMsgRequest> _request; + Command* _command = nullptr; + std::unique_ptr<rpc::ReplyBuilderInterface> _replyBuilder; + DbResponse _response; +}; + +} // namespace mongo diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index f01218bba70..f1569ffb0fb 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -73,6 +73,7 @@ #include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/tenant_migration_donor_util.h" +#include "mongo/db/request_execution_context.h" #include "mongo/db/run_op_kill_cursors.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" @@ -128,58 +129,73 @@ namespace { using namespace fmt::literals; -/** Allows for the very complex handleRequest function to be decomposed into parts. */ +/* + * Allows for the very complex handleRequest function to be decomposed into parts. + * It also provides the infrastructure to futurize the process of executing commands. + */ struct HandleRequest { - struct OpRunner { - explicit OpRunner(HandleRequest* hr) : hr{hr} {} - virtual ~OpRunner() = default; - virtual DbResponse run() = 0; - HandleRequest* hr; - }; - - HandleRequest(OperationContext* opCtx, - const Message& m, - const ServiceEntryPointCommon::Hooks& behaviors) - : opCtx{opCtx}, m{m}, behaviors{behaviors}, dbmsg{m} {} + // Maintains the context (e.g., opCtx and replyBuilder) required for command execution. + class ExecutionContext final : public RequestExecutionContext { + public: + ExecutionContext(OperationContext* opCtx, + Message msg, + std::unique_ptr<const ServiceEntryPointCommon::Hooks> hooks) + : RequestExecutionContext(opCtx), behaviors(std::move(hooks)) { + // It also initializes dbMessage, which is accessible via getDbMessage() + setMessage(std::move(msg)); + } + ~ExecutionContext() = default; - DbResponse run(); + Client& client() const { + return *getOpCtx()->getClient(); + } - NetworkOp op() const { - return m.operation(); - } + NetworkOp op() const { + return getMessage().operation(); + } - CurOp& currentOp() { - return *CurOp::get(opCtx); - } + CurOp& currentOp() { + return *CurOp::get(getOpCtx()); + } - NamespaceString nsString() const { - if (!dbmsg.messageShouldHaveNs()) - return {}; - return NamespaceString(dbmsg.getns()); - } + NamespaceString nsString() const { + auto& dbmsg = getDbMessage(); + if (!dbmsg.messageShouldHaveNs()) + return {}; + return NamespaceString(dbmsg.getns()); + } - void assertValidNsString() { - if (!nsString().isValid()) { - uassert(16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false); + void assertValidNsString() { + if (!nsString().isValid()) { + uassert( + 16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false); + } } - } - std::unique_ptr<OpRunner> makeOpRunner(); - void startOperation(); - void completeOperation(const DbResponse& dbresponse); + std::unique_ptr<const ServiceEntryPointCommon::Hooks> behaviors; + boost::optional<long long> slowMsOverride; + bool forceLog = false; + }; - Client& client() const { - return *opCtx->getClient(); - } + struct OpRunner { + explicit OpRunner(HandleRequest* hr) : executionContext{hr->executionContext} {} + virtual ~OpRunner() = default; + virtual Future<DbResponse> run() = 0; + std::shared_ptr<ExecutionContext> executionContext; + }; + + HandleRequest(OperationContext* opCtx, + const Message& msg, + std::unique_ptr<const ServiceEntryPointCommon::Hooks> behaviors) + : executionContext(std::make_shared<ExecutionContext>( + opCtx, const_cast<Message&>(msg), std::move(behaviors))) {} - OperationContext* opCtx; - const Message& m; - const ServiceEntryPointCommon::Hooks& behaviors; + std::unique_ptr<OpRunner> makeOpRunner(); - DbMessage dbmsg; + Future<void> startOperation(); + Future<void> completeOperation(); - boost::optional<long long> slowMsOverride; - bool forceLog = false; + std::shared_ptr<ExecutionContext> executionContext; }; void generateLegacyQueryErrorResponse(const AssertionException& exception, @@ -1721,31 +1737,51 @@ DbResponse receivedGetMore(OperationContext* opCtx, struct CommandOpRunner : HandleRequest::OpRunner { using HandleRequest::OpRunner::OpRunner; - DbResponse run() override { - DbResponse r = receivedCommands(hr->opCtx, hr->m, hr->behaviors); + Future<DbResponse> run() override try { + DbResponse r = receivedCommands(executionContext->getOpCtx(), + executionContext->getMessage(), + *executionContext->behaviors); // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that. - if (auto command = hr->currentOp().getCommand(); + if (auto command = executionContext->currentOp().getCommand(); command && (command->getName() == "hello")) { - hr->slowMsOverride = + executionContext->slowMsOverride = 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime); } return r; + } catch (const DBException& ex) { + return ex.toStatus(); } }; -struct QueryOpRunner : HandleRequest::OpRunner { +// Allows wrapping synchronous code in futures without repeating the try-catch block. +struct SynchronousOpRunner : HandleRequest::OpRunner { using HandleRequest::OpRunner::OpRunner; - DbResponse run() override { - hr->opCtx->markKillOnClientDisconnect(); - return receivedQuery( - hr->opCtx, hr->nsString(), *hr->opCtx->getClient(), hr->m, hr->behaviors); + virtual DbResponse runSync() = 0; + Future<DbResponse> run() final try { return runSync(); } catch (const DBException& ex) { + return ex.toStatus(); } }; -struct GetMoreOpRunner : HandleRequest::OpRunner { - using HandleRequest::OpRunner::OpRunner; - DbResponse run() override { - return receivedGetMore(hr->opCtx, hr->m, hr->currentOp(), &hr->forceLog); +struct QueryOpRunner : SynchronousOpRunner { + using SynchronousOpRunner::SynchronousOpRunner; + DbResponse runSync() override { + auto opCtx = executionContext->getOpCtx(); + opCtx->markKillOnClientDisconnect(); + return receivedQuery(opCtx, + executionContext->nsString(), + executionContext->client(), + executionContext->getMessage(), + *executionContext->behaviors); + } +}; + +struct GetMoreOpRunner : SynchronousOpRunner { + using SynchronousOpRunner::SynchronousOpRunner; + DbResponse runSync() override { + return receivedGetMore(executionContext->getOpCtx(), + executionContext->getMessage(), + executionContext->currentOp(), + &executionContext->forceLog); } }; @@ -1755,49 +1791,70 @@ struct GetMoreOpRunner : HandleRequest::OpRunner { * class provides a `run` that calls it and handles error reporting * via the `LastError` slot. */ -struct FireAndForgetOpRunner : HandleRequest::OpRunner { - using HandleRequest::OpRunner::OpRunner; +struct FireAndForgetOpRunner : SynchronousOpRunner { + using SynchronousOpRunner::SynchronousOpRunner; virtual void runAndForget() = 0; - DbResponse run() final; + DbResponse runSync() final; }; struct KillCursorsOpRunner : FireAndForgetOpRunner { using FireAndForgetOpRunner::FireAndForgetOpRunner; void runAndForget() override { - hr->currentOp().ensureStarted(); - hr->slowMsOverride = 10; - receivedKillCursors(hr->opCtx, hr->m); + executionContext->currentOp().ensureStarted(); + executionContext->slowMsOverride = 10; + receivedKillCursors(executionContext->getOpCtx(), executionContext->getMessage()); } }; struct InsertOpRunner : FireAndForgetOpRunner { using FireAndForgetOpRunner::FireAndForgetOpRunner; void runAndForget() override { - hr->assertValidNsString(); - receivedInsert(hr->opCtx, hr->nsString(), hr->m); + executionContext->assertValidNsString(); + receivedInsert(executionContext->getOpCtx(), + executionContext->nsString(), + executionContext->getMessage()); } }; struct UpdateOpRunner : FireAndForgetOpRunner { using FireAndForgetOpRunner::FireAndForgetOpRunner; void runAndForget() override { - hr->assertValidNsString(); - receivedUpdate(hr->opCtx, hr->nsString(), hr->m); + executionContext->assertValidNsString(); + receivedUpdate(executionContext->getOpCtx(), + executionContext->nsString(), + executionContext->getMessage()); } }; struct DeleteOpRunner : FireAndForgetOpRunner { using FireAndForgetOpRunner::FireAndForgetOpRunner; void runAndForget() override { - hr->assertValidNsString(); - receivedDelete(hr->opCtx, hr->nsString(), hr->m); + executionContext->assertValidNsString(); + receivedDelete(executionContext->getOpCtx(), + executionContext->nsString(), + executionContext->getMessage()); + } +}; + +struct UnsupportedOpRunner : SynchronousOpRunner { + using SynchronousOpRunner::SynchronousOpRunner; + DbResponse runSync() override { + // For compatibility reasons, we only log incidents of receiving operations that are not + // supported and return an empty response to the caller. + LOGV2(21968, + "Operation isn't supported: {operation}", + "Operation is not supported", + "operation"_attr = static_cast<int>(executionContext->op())); + executionContext->currentOp().done(); + executionContext->forceLog = true; + return {}; } }; std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() { - switch (op()) { + switch (executionContext->op()) { case dbQuery: - if (!nsString().isCommand()) + if (!executionContext->nsString().isCommand()) return std::make_unique<QueryOpRunner>(this); // FALLTHROUGH: it's a query containing a command case dbMsg: @@ -1813,96 +1870,93 @@ std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() { case dbDelete: return std::make_unique<DeleteOpRunner>(this); default: - LOGV2(21968, - "Operation isn't supported: {operation}", - "Operation is not supported", - "operation"_attr = static_cast<int>(op())); - return nullptr; + return std::make_unique<UnsupportedOpRunner>(this); } } -DbResponse FireAndForgetOpRunner::run() { +DbResponse FireAndForgetOpRunner::runSync() { try { runAndForget(); } catch (const AssertionException& ue) { - LastError::get(hr->client()).setLastError(ue.code(), ue.reason()); + LastError::get(executionContext->client()).setLastError(ue.code(), ue.reason()); LOGV2_DEBUG(21969, 3, "Caught Assertion in {networkOp}, continuing: {error}", "Assertion in fire-and-forget operation", - "networkOp"_attr = networkOpToString(hr->op()), + "networkOp"_attr = networkOpToString(executionContext->op()), "error"_attr = redact(ue)); - hr->currentOp().debug().errInfo = ue.toStatus(); + executionContext->currentOp().debug().errInfo = ue.toStatus(); } // A NotWritablePrimary error can be set either within // receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler above. // Either way, we want to throw an exception here, which will cause the client to be // disconnected. - if (LastError::get(hr->client()).hadNotPrimaryError()) { + if (LastError::get(executionContext->client()).hadNotPrimaryError()) { notPrimaryLegacyUnackWrites.increment(); uasserted(ErrorCodes::NotWritablePrimary, str::stream() << "Not-master error while processing '" - << networkOpToString(hr->op()) << "' operation on '" - << hr->nsString() << "' namespace via legacy " + << networkOpToString(executionContext->op()) << "' operation on '" + << executionContext->nsString() << "' namespace via legacy " << "fire-and-forget command execution."); } return {}; } -void HandleRequest::startOperation() { - if (client().isInDirectClient()) { +Future<void> HandleRequest::startOperation() try { + auto opCtx = executionContext->getOpCtx(); + auto& client = executionContext->client(); + auto& currentOp = executionContext->currentOp(); + + if (client.isInDirectClient()) { if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber()) { invariant(!opCtx->inMultiDocumentTransaction() && !opCtx->lockState()->inAWriteUnitOfWork()); } } else { - LastError::get(client()).startRequest(); - AuthorizationSession::get(client())->startRequest(opCtx); + LastError::get(client).startRequest(); + AuthorizationSession::get(client)->startRequest(opCtx); // We should not be holding any locks at this point invariant(!opCtx->lockState()->isLocked()); } { - stdx::lock_guard<Client> lk(client()); + stdx::lock_guard<Client> lk(client); // Commands handling code will reset this if the operation is a command // which is logically a basic CRUD operation like query, insert, etc. - currentOp().setNetworkOp_inlock(op()); - currentOp().setLogicalOp_inlock(networkOpToLogicalOp(op())); + currentOp.setNetworkOp_inlock(executionContext->op()); + currentOp.setLogicalOp_inlock(networkOpToLogicalOp(executionContext->op())); } + return {}; +} catch (const DBException& ex) { + return ex.toStatus(); } -DbResponse HandleRequest::run() { - startOperation(); - DbResponse dbresponse; - if (auto opRunner = makeOpRunner()) { - dbresponse = opRunner->run(); - } else { - currentOp().done(); - forceLog = true; - } - completeOperation(dbresponse); - return dbresponse; -} +Future<void> HandleRequest::completeOperation() try { + auto opCtx = executionContext->getOpCtx(); + auto& currentOp = executionContext->currentOp(); -void HandleRequest::completeOperation(const DbResponse& dbresponse) { // Mark the op as complete, and log it if appropriate. Returns a boolean indicating whether // this op should be written to the profiler. - const bool shouldProfile = currentOp().completeAndLogOperation( - opCtx, MONGO_LOGV2_DEFAULT_COMPONENT, dbresponse.response.size(), slowMsOverride, forceLog); + const bool shouldProfile = + currentOp.completeAndLogOperation(opCtx, + MONGO_LOGV2_DEFAULT_COMPONENT, + executionContext->getResponse().response.size(), + executionContext->slowMsOverride, + executionContext->forceLog); Top::get(opCtx->getServiceContext()) .incrementGlobalLatencyStats( opCtx, - durationCount<Microseconds>(currentOp().elapsedTimeExcludingPauses()), - currentOp().getReadWriteType()); + durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()), + currentOp.getReadWriteType()); if (shouldProfile) { // Performance profiling is on if (opCtx->lockState()->isReadLocked()) { LOGV2_DEBUG(21970, 1, "Note: not profiling because of recursive read lock"); - } else if (client().isInDirectClient()) { + } else if (executionContext->client().isInDirectClient()) { LOGV2_DEBUG(21971, 1, "Note: not profiling because we are in DBDirectClient"); - } else if (behaviors.lockedForWriting()) { + } else if (executionContext->behaviors->lockedForWriting()) { // TODO SERVER-26825: Fix race condition where fsyncLock is acquired post // lockedForWriting() call but prior to profile collection lock acquisition. LOGV2_DEBUG(21972, 1, "Note: not profiling because doing fsync+lock"); @@ -1910,11 +1964,14 @@ void HandleRequest::completeOperation(const DbResponse& dbresponse) { LOGV2_DEBUG(21973, 1, "Note: not profiling because server is read-only"); } else { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - profile(opCtx, op()); + profile(opCtx, executionContext->op()); } } recordCurOpMetrics(opCtx); + return {}; +} catch (const DBException& ex) { + return ex.toStatus(); } } // namespace @@ -1928,15 +1985,27 @@ BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* comman return bob.obj(); } -Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, - const Message& m, - const Hooks& behaviors) noexcept { - try { - return Future<DbResponse>::makeReady(HandleRequest{opCtx, m, behaviors}.run()); - } catch (DBException& e) { - LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(e)); - return e.toStatus(); - } +Future<DbResponse> ServiceEntryPointCommon::handleRequest( + OperationContext* opCtx, const Message& m, std::unique_ptr<const Hooks> behaviors) noexcept { + auto hr = std::make_shared<HandleRequest>(opCtx, m, std::move(behaviors)); + return hr->startOperation() + .then([hr]() -> Future<void> { + auto opRunner = hr->makeOpRunner(); + invariant(opRunner); + return opRunner->run().then( + [execContext = hr->executionContext](DbResponse response) -> void { + // Set the response upon successful execution + execContext->setResponse(std::move(response)); + }); + }) + .then([hr] { return hr->completeOperation(); }) + .onCompletion([hr](Status status) -> Future<DbResponse> { + if (!status.isOK()) { + LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(status)); + return status; + } + return hr->executionContext->getResponse(); + }); } ServiceEntryPointCommon::Hooks::~Hooks() = default; diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h index 090cf302b48..7c9dff81636 100644 --- a/src/mongo/db/service_entry_point_common.h +++ b/src/mongo/db/service_entry_point_common.h @@ -101,7 +101,7 @@ struct ServiceEntryPointCommon { static Future<DbResponse> handleRequest(OperationContext* opCtx, const Message& m, - const Hooks& hooks) noexcept; + std::unique_ptr<const Hooks> hooks) noexcept; /** * Produce a new object based on cmdObj, but with redactions applied as specified by diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 75b28b6595a..1e037cd3717 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -271,7 +271,7 @@ public: Future<DbResponse> ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) noexcept { - return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{}); + return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique<Hooks>()); } } // namespace mongo diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp index 1dbf1a52c95..a2ffc881b4b 100644 --- a/src/mongo/embedded/service_entry_point_embedded.cpp +++ b/src/mongo/embedded/service_entry_point_embedded.cpp @@ -117,7 +117,7 @@ Future<DbResponse> ServiceEntryPointEmbedded::handleRequest(OperationContext* op // guarantees of the state (that they have run). checked_cast<PeriodicRunnerEmbedded*>(opCtx->getServiceContext()->getPeriodicRunner()) ->tryPump(); - return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{}); + return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique<Hooks>()); } void ServiceEntryPointEmbedded::startSession(transport::SessionHandle session) { diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 6fde0057ed2..0cb587426e9 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -316,9 +316,11 @@ private: Message _inMessage; Message _outMessage; - // Allows delegating destruction of opCtx to another function to potentially remove its cost - // from the critical path. This is currently only used in `processMessage()`. - ServiceContext::UniqueOperationContext _killedOpCtx; + // Owns the instance of OperationContext that is used to process ingress requests (i.e., + // `handleRequest`). It also allows delegating destruction of opCtx to another function to + // potentially remove its cost from the critical path. This is currently only used in + // `processMessage()`. + ServiceContext::UniqueOperationContext _opCtx; }; /* @@ -668,23 +670,21 @@ Future<void> ServiceStateMachine::Impl::processMessage() { networkCounter.hitLogicalIn(_inMessage.size()); // Pass sourced Message to handler to generate response. - auto opCtx = Client::getCurrent()->makeOperationContext(); + invariant(!_opCtx); + _opCtx = Client::getCurrent()->makeOperationContext(); if (_inExhaust) { - opCtx->markKillOnClientDisconnect(); + _opCtx->markKillOnClientDisconnect(); } // The handleRequest is implemented in a subclass for mongod/mongos and actually all the // database work for this request. - return _sep->handleRequest(opCtx.get(), _inMessage) - .then([this, &compressorMgr = compressorMgr, opCtx = std::move(opCtx)]( - DbResponse dbresponse) mutable -> void { + return _sep->handleRequest(_opCtx.get(), _inMessage) + .then([this, &compressorMgr = compressorMgr](DbResponse dbresponse) mutable -> void { // opCtx must be killed and delisted here so that the operation cannot show up in // currentOp results after the response reaches the client. The destruction is postponed // for later to mitigate its performance impact on the critical path of execution. - _serviceContext->killAndDelistOperation(opCtx.get(), + _serviceContext->killAndDelistOperation(_opCtx.get(), ErrorCodes::OperationIsKilledAndDelisted); - invariant(!_killedOpCtx); - _killedOpCtx = std::move(opCtx); // Format our response, if we have one Message& toSink = dbresponse.response; @@ -776,8 +776,8 @@ void ServiceStateMachine::Impl::runOnce() { .getAsync([this](Status status) { // Destroy the opCtx (already killed) here, to potentially use the delay between // clients' requests to hide the destruction cost. - if (MONGO_likely(_killedOpCtx)) { - _killedOpCtx.reset(); + if (MONGO_likely(_opCtx)) { + _opCtx.reset(); } if (!status.isOK()) { _state.store(State::EndSession); @@ -870,10 +870,10 @@ void ServiceStateMachine::Impl::setCleanupHook(std::function<void()> hook) { void ServiceStateMachine::Impl::cleanupSession() { // Ensure the delayed destruction of opCtx always happens before doing the cleanup. - if (MONGO_likely(_killedOpCtx)) { - _killedOpCtx.reset(); + if (MONGO_likely(_opCtx)) { + _opCtx.reset(); } - invariant(!_killedOpCtx); + invariant(!_opCtx); cleanupExhaustResources(); |