summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-09-17 17:36:35 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-09-17 17:36:35 +0000
commit66b412e1658a3db4935385147b0048a37b39193e (patch)
tree974aa4a1f9e23d441ce2c7aa0b20b4ff9236a033
parent5f3f8d97455eb1a599bf29d3ef2c981c3be0f265 (diff)
downloadmongo-66b412e1658a3db4935385147b0048a37b39193e.tar.gz
SERVER-49107 Add support for async execution to handleRequest
This patch extends handleRequest to capture context for out-of-line execution and makes ServiceStateMachine own the opCtx used for command execution.
-rw-r--r--src/mongo/db/request_execution_context.h131
-rw-r--r--src/mongo/db/service_entry_point_common.cpp299
-rw-r--r--src/mongo/db/service_entry_point_common.h2
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp2
-rw-r--r--src/mongo/embedded/service_entry_point_embedded.cpp2
-rw-r--r--src/mongo/transport/service_state_machine.cpp32
6 files changed, 334 insertions, 134 deletions
diff --git a/src/mongo/db/request_execution_context.h b/src/mongo/db/request_execution_context.h
new file mode 100644
index 00000000000..b34c7e4f3b7
--- /dev/null
+++ b/src/mongo/db/request_execution_context.h
@@ -0,0 +1,131 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/db/dbmessage.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/rpc/message.h"
+#include "mongo/rpc/op_msg.h"
+#include "mongo/rpc/reply_builder_interface.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+class Command;
+
+/**
+ * Captures the execution context for a command to allow safe, shared and asynchronous accesses.
+ * This class owns all objects that participate in command execution (e.g., `request`). The only
+ * exceptions are `opCtx` and `command`. The `opCtx` remains valid so long as its corresponding
+ * client is attached to the executor thread. In case of `command`, it is a global, static
+ * construct and is safe to be accessed through raw pointers.
+ * Any access from a client thread that does not own the `opCtx`, or after the `opCtx` is
+ * released is strictly forbidden.
+ */
+class RequestExecutionContext {
+public:
+ RequestExecutionContext() = delete;
+ RequestExecutionContext(const RequestExecutionContext&) = delete;
+ RequestExecutionContext(RequestExecutionContext&&) = delete;
+
+ explicit RequestExecutionContext(OperationContext* opCtx) : _opCtx(opCtx) {}
+
+ auto getOpCtx() const {
+ invariant(_isOnClientThread());
+ return _opCtx;
+ }
+
+ void setMessage(Message message) {
+ invariant(_isOnClientThread() && !_message);
+ _message = std::move(message);
+ _dbmsg = std::make_unique<DbMessage>(_message.get());
+ }
+ const Message& getMessage() const {
+ invariant(_isOnClientThread() && _message);
+ return _message.get();
+ }
+ DbMessage& getDbMessage() const {
+ invariant(_isOnClientThread() && _dbmsg);
+ return *_dbmsg.get();
+ }
+
+ void setRequest(OpMsgRequest request) {
+ invariant(_isOnClientThread() && !_request);
+ _request = std::move(request);
+ }
+ const OpMsgRequest& getRequest() const {
+ invariant(_isOnClientThread() && _request);
+ return _request.get();
+ }
+
+ void setCommand(Command* command) {
+ invariant(_isOnClientThread() && !_command);
+ _command = command;
+ }
+ Command* getCommand() const {
+ invariant(_isOnClientThread());
+ return _command;
+ }
+
+ void setReplyBuilder(std::unique_ptr<rpc::ReplyBuilderInterface> replyBuilder) {
+ invariant(_isOnClientThread() && !_replyBuilder);
+ _replyBuilder = std::move(replyBuilder);
+ }
+ auto getReplyBuilder() const {
+ invariant(_isOnClientThread() && _replyBuilder);
+ return _replyBuilder.get();
+ }
+
+ void setResponse(DbResponse response) {
+ invariant(_isOnClientThread());
+ _response = std::move(response);
+ }
+ DbResponse& getResponse() {
+ invariant(_isOnClientThread());
+ return _response;
+ }
+
+private:
+ bool _isOnClientThread() const {
+ return _opCtx != nullptr && Client::getCurrent() == _opCtx->getClient();
+ }
+
+ OperationContext* const _opCtx;
+ boost::optional<Message> _message;
+ std::unique_ptr<DbMessage> _dbmsg;
+ boost::optional<OpMsgRequest> _request;
+ Command* _command = nullptr;
+ std::unique_ptr<rpc::ReplyBuilderInterface> _replyBuilder;
+ DbResponse _response;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index f01218bba70..f1569ffb0fb 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -73,6 +73,7 @@
#include "mongo/db/repl/speculative_majority_read_info.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/tenant_migration_donor_util.h"
+#include "mongo/db/request_execution_context.h"
#include "mongo/db/run_op_kill_cursors.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
@@ -128,58 +129,73 @@ namespace {
using namespace fmt::literals;
-/** Allows for the very complex handleRequest function to be decomposed into parts. */
+/*
+ * Allows for the very complex handleRequest function to be decomposed into parts.
+ * It also provides the infrastructure to futurize the process of executing commands.
+ */
struct HandleRequest {
- struct OpRunner {
- explicit OpRunner(HandleRequest* hr) : hr{hr} {}
- virtual ~OpRunner() = default;
- virtual DbResponse run() = 0;
- HandleRequest* hr;
- };
-
- HandleRequest(OperationContext* opCtx,
- const Message& m,
- const ServiceEntryPointCommon::Hooks& behaviors)
- : opCtx{opCtx}, m{m}, behaviors{behaviors}, dbmsg{m} {}
+ // Maintains the context (e.g., opCtx and replyBuilder) required for command execution.
+ class ExecutionContext final : public RequestExecutionContext {
+ public:
+ ExecutionContext(OperationContext* opCtx,
+ Message msg,
+ std::unique_ptr<const ServiceEntryPointCommon::Hooks> hooks)
+ : RequestExecutionContext(opCtx), behaviors(std::move(hooks)) {
+ // It also initializes dbMessage, which is accessible via getDbMessage()
+ setMessage(std::move(msg));
+ }
+ ~ExecutionContext() = default;
- DbResponse run();
+ Client& client() const {
+ return *getOpCtx()->getClient();
+ }
- NetworkOp op() const {
- return m.operation();
- }
+ NetworkOp op() const {
+ return getMessage().operation();
+ }
- CurOp& currentOp() {
- return *CurOp::get(opCtx);
- }
+ CurOp& currentOp() {
+ return *CurOp::get(getOpCtx());
+ }
- NamespaceString nsString() const {
- if (!dbmsg.messageShouldHaveNs())
- return {};
- return NamespaceString(dbmsg.getns());
- }
+ NamespaceString nsString() const {
+ auto& dbmsg = getDbMessage();
+ if (!dbmsg.messageShouldHaveNs())
+ return {};
+ return NamespaceString(dbmsg.getns());
+ }
- void assertValidNsString() {
- if (!nsString().isValid()) {
- uassert(16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false);
+ void assertValidNsString() {
+ if (!nsString().isValid()) {
+ uassert(
+ 16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false);
+ }
}
- }
- std::unique_ptr<OpRunner> makeOpRunner();
- void startOperation();
- void completeOperation(const DbResponse& dbresponse);
+ std::unique_ptr<const ServiceEntryPointCommon::Hooks> behaviors;
+ boost::optional<long long> slowMsOverride;
+ bool forceLog = false;
+ };
- Client& client() const {
- return *opCtx->getClient();
- }
+ struct OpRunner {
+ explicit OpRunner(HandleRequest* hr) : executionContext{hr->executionContext} {}
+ virtual ~OpRunner() = default;
+ virtual Future<DbResponse> run() = 0;
+ std::shared_ptr<ExecutionContext> executionContext;
+ };
+
+ HandleRequest(OperationContext* opCtx,
+ const Message& msg,
+ std::unique_ptr<const ServiceEntryPointCommon::Hooks> behaviors)
+ : executionContext(std::make_shared<ExecutionContext>(
+ opCtx, const_cast<Message&>(msg), std::move(behaviors))) {}
- OperationContext* opCtx;
- const Message& m;
- const ServiceEntryPointCommon::Hooks& behaviors;
+ std::unique_ptr<OpRunner> makeOpRunner();
- DbMessage dbmsg;
+ Future<void> startOperation();
+ Future<void> completeOperation();
- boost::optional<long long> slowMsOverride;
- bool forceLog = false;
+ std::shared_ptr<ExecutionContext> executionContext;
};
void generateLegacyQueryErrorResponse(const AssertionException& exception,
@@ -1721,31 +1737,51 @@ DbResponse receivedGetMore(OperationContext* opCtx,
struct CommandOpRunner : HandleRequest::OpRunner {
using HandleRequest::OpRunner::OpRunner;
- DbResponse run() override {
- DbResponse r = receivedCommands(hr->opCtx, hr->m, hr->behaviors);
+ Future<DbResponse> run() override try {
+ DbResponse r = receivedCommands(executionContext->getOpCtx(),
+ executionContext->getMessage(),
+ *executionContext->behaviors);
// Hello should take kMaxAwaitTimeMs at most, log if it takes twice that.
- if (auto command = hr->currentOp().getCommand();
+ if (auto command = executionContext->currentOp().getCommand();
command && (command->getName() == "hello")) {
- hr->slowMsOverride =
+ executionContext->slowMsOverride =
2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime);
}
return r;
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
};
-struct QueryOpRunner : HandleRequest::OpRunner {
+// Allows wrapping synchronous code in futures without repeating the try-catch block.
+struct SynchronousOpRunner : HandleRequest::OpRunner {
using HandleRequest::OpRunner::OpRunner;
- DbResponse run() override {
- hr->opCtx->markKillOnClientDisconnect();
- return receivedQuery(
- hr->opCtx, hr->nsString(), *hr->opCtx->getClient(), hr->m, hr->behaviors);
+ virtual DbResponse runSync() = 0;
+ Future<DbResponse> run() final try { return runSync(); } catch (const DBException& ex) {
+ return ex.toStatus();
}
};
-struct GetMoreOpRunner : HandleRequest::OpRunner {
- using HandleRequest::OpRunner::OpRunner;
- DbResponse run() override {
- return receivedGetMore(hr->opCtx, hr->m, hr->currentOp(), &hr->forceLog);
+struct QueryOpRunner : SynchronousOpRunner {
+ using SynchronousOpRunner::SynchronousOpRunner;
+ DbResponse runSync() override {
+ auto opCtx = executionContext->getOpCtx();
+ opCtx->markKillOnClientDisconnect();
+ return receivedQuery(opCtx,
+ executionContext->nsString(),
+ executionContext->client(),
+ executionContext->getMessage(),
+ *executionContext->behaviors);
+ }
+};
+
+struct GetMoreOpRunner : SynchronousOpRunner {
+ using SynchronousOpRunner::SynchronousOpRunner;
+ DbResponse runSync() override {
+ return receivedGetMore(executionContext->getOpCtx(),
+ executionContext->getMessage(),
+ executionContext->currentOp(),
+ &executionContext->forceLog);
}
};
@@ -1755,49 +1791,70 @@ struct GetMoreOpRunner : HandleRequest::OpRunner {
* class provides a `run` that calls it and handles error reporting
* via the `LastError` slot.
*/
-struct FireAndForgetOpRunner : HandleRequest::OpRunner {
- using HandleRequest::OpRunner::OpRunner;
+struct FireAndForgetOpRunner : SynchronousOpRunner {
+ using SynchronousOpRunner::SynchronousOpRunner;
virtual void runAndForget() = 0;
- DbResponse run() final;
+ DbResponse runSync() final;
};
struct KillCursorsOpRunner : FireAndForgetOpRunner {
using FireAndForgetOpRunner::FireAndForgetOpRunner;
void runAndForget() override {
- hr->currentOp().ensureStarted();
- hr->slowMsOverride = 10;
- receivedKillCursors(hr->opCtx, hr->m);
+ executionContext->currentOp().ensureStarted();
+ executionContext->slowMsOverride = 10;
+ receivedKillCursors(executionContext->getOpCtx(), executionContext->getMessage());
}
};
struct InsertOpRunner : FireAndForgetOpRunner {
using FireAndForgetOpRunner::FireAndForgetOpRunner;
void runAndForget() override {
- hr->assertValidNsString();
- receivedInsert(hr->opCtx, hr->nsString(), hr->m);
+ executionContext->assertValidNsString();
+ receivedInsert(executionContext->getOpCtx(),
+ executionContext->nsString(),
+ executionContext->getMessage());
}
};
struct UpdateOpRunner : FireAndForgetOpRunner {
using FireAndForgetOpRunner::FireAndForgetOpRunner;
void runAndForget() override {
- hr->assertValidNsString();
- receivedUpdate(hr->opCtx, hr->nsString(), hr->m);
+ executionContext->assertValidNsString();
+ receivedUpdate(executionContext->getOpCtx(),
+ executionContext->nsString(),
+ executionContext->getMessage());
}
};
struct DeleteOpRunner : FireAndForgetOpRunner {
using FireAndForgetOpRunner::FireAndForgetOpRunner;
void runAndForget() override {
- hr->assertValidNsString();
- receivedDelete(hr->opCtx, hr->nsString(), hr->m);
+ executionContext->assertValidNsString();
+ receivedDelete(executionContext->getOpCtx(),
+ executionContext->nsString(),
+ executionContext->getMessage());
+ }
+};
+
+struct UnsupportedOpRunner : SynchronousOpRunner {
+ using SynchronousOpRunner::SynchronousOpRunner;
+ DbResponse runSync() override {
+ // For compatibility reasons, we only log incidents of receiving operations that are not
+ // supported and return an empty response to the caller.
+ LOGV2(21968,
+ "Operation isn't supported: {operation}",
+ "Operation is not supported",
+ "operation"_attr = static_cast<int>(executionContext->op()));
+ executionContext->currentOp().done();
+ executionContext->forceLog = true;
+ return {};
}
};
std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() {
- switch (op()) {
+ switch (executionContext->op()) {
case dbQuery:
- if (!nsString().isCommand())
+ if (!executionContext->nsString().isCommand())
return std::make_unique<QueryOpRunner>(this);
// FALLTHROUGH: it's a query containing a command
case dbMsg:
@@ -1813,96 +1870,93 @@ std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() {
case dbDelete:
return std::make_unique<DeleteOpRunner>(this);
default:
- LOGV2(21968,
- "Operation isn't supported: {operation}",
- "Operation is not supported",
- "operation"_attr = static_cast<int>(op()));
- return nullptr;
+ return std::make_unique<UnsupportedOpRunner>(this);
}
}
-DbResponse FireAndForgetOpRunner::run() {
+DbResponse FireAndForgetOpRunner::runSync() {
try {
runAndForget();
} catch (const AssertionException& ue) {
- LastError::get(hr->client()).setLastError(ue.code(), ue.reason());
+ LastError::get(executionContext->client()).setLastError(ue.code(), ue.reason());
LOGV2_DEBUG(21969,
3,
"Caught Assertion in {networkOp}, continuing: {error}",
"Assertion in fire-and-forget operation",
- "networkOp"_attr = networkOpToString(hr->op()),
+ "networkOp"_attr = networkOpToString(executionContext->op()),
"error"_attr = redact(ue));
- hr->currentOp().debug().errInfo = ue.toStatus();
+ executionContext->currentOp().debug().errInfo = ue.toStatus();
}
// A NotWritablePrimary error can be set either within
// receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler above.
// Either way, we want to throw an exception here, which will cause the client to be
// disconnected.
- if (LastError::get(hr->client()).hadNotPrimaryError()) {
+ if (LastError::get(executionContext->client()).hadNotPrimaryError()) {
notPrimaryLegacyUnackWrites.increment();
uasserted(ErrorCodes::NotWritablePrimary,
str::stream() << "Not-master error while processing '"
- << networkOpToString(hr->op()) << "' operation on '"
- << hr->nsString() << "' namespace via legacy "
+ << networkOpToString(executionContext->op()) << "' operation on '"
+ << executionContext->nsString() << "' namespace via legacy "
<< "fire-and-forget command execution.");
}
return {};
}
-void HandleRequest::startOperation() {
- if (client().isInDirectClient()) {
+Future<void> HandleRequest::startOperation() try {
+ auto opCtx = executionContext->getOpCtx();
+ auto& client = executionContext->client();
+ auto& currentOp = executionContext->currentOp();
+
+ if (client.isInDirectClient()) {
if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber()) {
invariant(!opCtx->inMultiDocumentTransaction() &&
!opCtx->lockState()->inAWriteUnitOfWork());
}
} else {
- LastError::get(client()).startRequest();
- AuthorizationSession::get(client())->startRequest(opCtx);
+ LastError::get(client).startRequest();
+ AuthorizationSession::get(client)->startRequest(opCtx);
// We should not be holding any locks at this point
invariant(!opCtx->lockState()->isLocked());
}
{
- stdx::lock_guard<Client> lk(client());
+ stdx::lock_guard<Client> lk(client);
// Commands handling code will reset this if the operation is a command
// which is logically a basic CRUD operation like query, insert, etc.
- currentOp().setNetworkOp_inlock(op());
- currentOp().setLogicalOp_inlock(networkOpToLogicalOp(op()));
+ currentOp.setNetworkOp_inlock(executionContext->op());
+ currentOp.setLogicalOp_inlock(networkOpToLogicalOp(executionContext->op()));
}
+ return {};
+} catch (const DBException& ex) {
+ return ex.toStatus();
}
-DbResponse HandleRequest::run() {
- startOperation();
- DbResponse dbresponse;
- if (auto opRunner = makeOpRunner()) {
- dbresponse = opRunner->run();
- } else {
- currentOp().done();
- forceLog = true;
- }
- completeOperation(dbresponse);
- return dbresponse;
-}
+Future<void> HandleRequest::completeOperation() try {
+ auto opCtx = executionContext->getOpCtx();
+ auto& currentOp = executionContext->currentOp();
-void HandleRequest::completeOperation(const DbResponse& dbresponse) {
// Mark the op as complete, and log it if appropriate. Returns a boolean indicating whether
// this op should be written to the profiler.
- const bool shouldProfile = currentOp().completeAndLogOperation(
- opCtx, MONGO_LOGV2_DEFAULT_COMPONENT, dbresponse.response.size(), slowMsOverride, forceLog);
+ const bool shouldProfile =
+ currentOp.completeAndLogOperation(opCtx,
+ MONGO_LOGV2_DEFAULT_COMPONENT,
+ executionContext->getResponse().response.size(),
+ executionContext->slowMsOverride,
+ executionContext->forceLog);
Top::get(opCtx->getServiceContext())
.incrementGlobalLatencyStats(
opCtx,
- durationCount<Microseconds>(currentOp().elapsedTimeExcludingPauses()),
- currentOp().getReadWriteType());
+ durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()),
+ currentOp.getReadWriteType());
if (shouldProfile) {
// Performance profiling is on
if (opCtx->lockState()->isReadLocked()) {
LOGV2_DEBUG(21970, 1, "Note: not profiling because of recursive read lock");
- } else if (client().isInDirectClient()) {
+ } else if (executionContext->client().isInDirectClient()) {
LOGV2_DEBUG(21971, 1, "Note: not profiling because we are in DBDirectClient");
- } else if (behaviors.lockedForWriting()) {
+ } else if (executionContext->behaviors->lockedForWriting()) {
// TODO SERVER-26825: Fix race condition where fsyncLock is acquired post
// lockedForWriting() call but prior to profile collection lock acquisition.
LOGV2_DEBUG(21972, 1, "Note: not profiling because doing fsync+lock");
@@ -1910,11 +1964,14 @@ void HandleRequest::completeOperation(const DbResponse& dbresponse) {
LOGV2_DEBUG(21973, 1, "Note: not profiling because server is read-only");
} else {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
- profile(opCtx, op());
+ profile(opCtx, executionContext->op());
}
}
recordCurOpMetrics(opCtx);
+ return {};
+} catch (const DBException& ex) {
+ return ex.toStatus();
}
} // namespace
@@ -1928,15 +1985,27 @@ BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* comman
return bob.obj();
}
-Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCtx,
- const Message& m,
- const Hooks& behaviors) noexcept {
- try {
- return Future<DbResponse>::makeReady(HandleRequest{opCtx, m, behaviors}.run());
- } catch (DBException& e) {
- LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(e));
- return e.toStatus();
- }
+Future<DbResponse> ServiceEntryPointCommon::handleRequest(
+ OperationContext* opCtx, const Message& m, std::unique_ptr<const Hooks> behaviors) noexcept {
+ auto hr = std::make_shared<HandleRequest>(opCtx, m, std::move(behaviors));
+ return hr->startOperation()
+ .then([hr]() -> Future<void> {
+ auto opRunner = hr->makeOpRunner();
+ invariant(opRunner);
+ return opRunner->run().then(
+ [execContext = hr->executionContext](DbResponse response) -> void {
+ // Set the response upon successful execution
+ execContext->setResponse(std::move(response));
+ });
+ })
+ .then([hr] { return hr->completeOperation(); })
+ .onCompletion([hr](Status status) -> Future<DbResponse> {
+ if (!status.isOK()) {
+ LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(status));
+ return status;
+ }
+ return hr->executionContext->getResponse();
+ });
}
ServiceEntryPointCommon::Hooks::~Hooks() = default;
diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h
index 090cf302b48..7c9dff81636 100644
--- a/src/mongo/db/service_entry_point_common.h
+++ b/src/mongo/db/service_entry_point_common.h
@@ -101,7 +101,7 @@ struct ServiceEntryPointCommon {
static Future<DbResponse> handleRequest(OperationContext* opCtx,
const Message& m,
- const Hooks& hooks) noexcept;
+ std::unique_ptr<const Hooks> hooks) noexcept;
/**
* Produce a new object based on cmdObj, but with redactions applied as specified by
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 75b28b6595a..1e037cd3717 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -271,7 +271,7 @@ public:
Future<DbResponse> ServiceEntryPointMongod::handleRequest(OperationContext* opCtx,
const Message& m) noexcept {
- return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{});
+ return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique<Hooks>());
}
} // namespace mongo
diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp
index 1dbf1a52c95..a2ffc881b4b 100644
--- a/src/mongo/embedded/service_entry_point_embedded.cpp
+++ b/src/mongo/embedded/service_entry_point_embedded.cpp
@@ -117,7 +117,7 @@ Future<DbResponse> ServiceEntryPointEmbedded::handleRequest(OperationContext* op
// guarantees of the state (that they have run).
checked_cast<PeriodicRunnerEmbedded*>(opCtx->getServiceContext()->getPeriodicRunner())
->tryPump();
- return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{});
+ return ServiceEntryPointCommon::handleRequest(opCtx, m, std::make_unique<Hooks>());
}
void ServiceEntryPointEmbedded::startSession(transport::SessionHandle session) {
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 6fde0057ed2..0cb587426e9 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -316,9 +316,11 @@ private:
Message _inMessage;
Message _outMessage;
- // Allows delegating destruction of opCtx to another function to potentially remove its cost
- // from the critical path. This is currently only used in `processMessage()`.
- ServiceContext::UniqueOperationContext _killedOpCtx;
+ // Owns the instance of OperationContext that is used to process ingress requests (i.e.,
+ // `handleRequest`). It also allows delegating destruction of opCtx to another function to
+ // potentially remove its cost from the critical path. This is currently only used in
+ // `processMessage()`.
+ ServiceContext::UniqueOperationContext _opCtx;
};
/*
@@ -668,23 +670,21 @@ Future<void> ServiceStateMachine::Impl::processMessage() {
networkCounter.hitLogicalIn(_inMessage.size());
// Pass sourced Message to handler to generate response.
- auto opCtx = Client::getCurrent()->makeOperationContext();
+ invariant(!_opCtx);
+ _opCtx = Client::getCurrent()->makeOperationContext();
if (_inExhaust) {
- opCtx->markKillOnClientDisconnect();
+ _opCtx->markKillOnClientDisconnect();
}
// The handleRequest is implemented in a subclass for mongod/mongos and actually all the
// database work for this request.
- return _sep->handleRequest(opCtx.get(), _inMessage)
- .then([this, &compressorMgr = compressorMgr, opCtx = std::move(opCtx)](
- DbResponse dbresponse) mutable -> void {
+ return _sep->handleRequest(_opCtx.get(), _inMessage)
+ .then([this, &compressorMgr = compressorMgr](DbResponse dbresponse) mutable -> void {
// opCtx must be killed and delisted here so that the operation cannot show up in
// currentOp results after the response reaches the client. The destruction is postponed
// for later to mitigate its performance impact on the critical path of execution.
- _serviceContext->killAndDelistOperation(opCtx.get(),
+ _serviceContext->killAndDelistOperation(_opCtx.get(),
ErrorCodes::OperationIsKilledAndDelisted);
- invariant(!_killedOpCtx);
- _killedOpCtx = std::move(opCtx);
// Format our response, if we have one
Message& toSink = dbresponse.response;
@@ -776,8 +776,8 @@ void ServiceStateMachine::Impl::runOnce() {
.getAsync([this](Status status) {
// Destroy the opCtx (already killed) here, to potentially use the delay between
// clients' requests to hide the destruction cost.
- if (MONGO_likely(_killedOpCtx)) {
- _killedOpCtx.reset();
+ if (MONGO_likely(_opCtx)) {
+ _opCtx.reset();
}
if (!status.isOK()) {
_state.store(State::EndSession);
@@ -870,10 +870,10 @@ void ServiceStateMachine::Impl::setCleanupHook(std::function<void()> hook) {
void ServiceStateMachine::Impl::cleanupSession() {
// Ensure the delayed destruction of opCtx always happens before doing the cleanup.
- if (MONGO_likely(_killedOpCtx)) {
- _killedOpCtx.reset();
+ if (MONGO_likely(_opCtx)) {
+ _opCtx.reset();
}
- invariant(!_killedOpCtx);
+ invariant(!_opCtx);
cleanupExhaustResources();