summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-11-02 17:38:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-24 18:37:06 +0000
commit40c3bac93c2091cea665de686cae93ef90429775 (patch)
treece51224ab1bce9a17d9b62d095d03c0bb16de9fa
parent5e82f974f44a822753ad0656692a1011a6611b3c (diff)
downloadmongo-40c3bac93c2091cea665de686cae93ef90429775.tar.gz
SERVER-51690 Add support for async command execution to Mongos
-rw-r--r--src/mongo/db/request_execution_context.h11
-rw-r--r--src/mongo/db/service_entry_point_common.cpp5
-rw-r--r--src/mongo/s/commands/cluster_command_test_fixture.cpp4
-rw-r--r--src/mongo/s/commands/strategy.cpp1499
-rw-r--r--src/mongo/s/commands/strategy.h5
-rw-r--r--src/mongo/s/service_entry_point_mongos.cpp303
6 files changed, 1073 insertions, 754 deletions
diff --git a/src/mongo/db/request_execution_context.h b/src/mongo/db/request_execution_context.h
index b34c7e4f3b7..44f06f36369 100644
--- a/src/mongo/db/request_execution_context.h
+++ b/src/mongo/db/request_execution_context.h
@@ -57,22 +57,21 @@ public:
RequestExecutionContext(const RequestExecutionContext&) = delete;
RequestExecutionContext(RequestExecutionContext&&) = delete;
- explicit RequestExecutionContext(OperationContext* opCtx) : _opCtx(opCtx) {}
+ RequestExecutionContext(OperationContext* opCtx, Message message)
+ : _opCtx(opCtx),
+ _message(std::move(message)),
+ _dbmsg(std::make_unique<DbMessage>(_message.get())) {}
auto getOpCtx() const {
invariant(_isOnClientThread());
return _opCtx;
}
- void setMessage(Message message) {
- invariant(_isOnClientThread() && !_message);
- _message = std::move(message);
- _dbmsg = std::make_unique<DbMessage>(_message.get());
- }
const Message& getMessage() const {
invariant(_isOnClientThread() && _message);
return _message.get();
}
+
DbMessage& getDbMessage() const {
invariant(_isOnClientThread() && _dbmsg);
return *_dbmsg.get();
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 528b37d071e..26f9c9266b3 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -141,10 +141,7 @@ struct HandleRequest {
ExecutionContext(OperationContext* opCtx,
Message msg,
std::unique_ptr<const ServiceEntryPointCommon::Hooks> hooks)
- : RequestExecutionContext(opCtx), behaviors(std::move(hooks)) {
- // It also initializes dbMessage, which is accessible via getDbMessage()
- setMessage(std::move(msg));
- }
+ : RequestExecutionContext(opCtx, std::move(msg)), behaviors(std::move(hooks)) {}
~ExecutionContext() = default;
Client& client() const {
diff --git a/src/mongo/s/commands/cluster_command_test_fixture.cpp b/src/mongo/s/commands/cluster_command_test_fixture.cpp
index 316fc78782f..8c8813a3b11 100644
--- a/src/mongo/s/commands/cluster_command_test_fixture.cpp
+++ b/src/mongo/s/commands/cluster_command_test_fixture.cpp
@@ -122,7 +122,9 @@ DbResponse ClusterCommandTestFixture::runCommand(BSONObj cmd) {
auto clusterGLE = ClusterLastErrorInfo::get(client.get());
clusterGLE->newRequest();
- return Strategy::clientCommand(opCtx.get(), opMsgRequest.serialize());
+ AlternativeClientRegion acr(client);
+ auto rec = std::make_shared<RequestExecutionContext>(opCtx.get(), opMsgRequest.serialize());
+ return Strategy::clientCommand(std::move(rec)).get();
}
void ClusterCommandTestFixture::runCommandSuccessful(BSONObj cmd, bool isTargeted) {
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 44a5f0ad044..2523b22204d 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -156,30 +156,28 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res
/**
* Invokes the given command and aborts the transaction on any non-retryable errors.
*/
-void invokeInTransactionRouter(OperationContext* opCtx,
- const OpMsgRequest& request,
- CommandInvocation* invocation,
- rpc::ReplyBuilderInterface* result) {
+Future<void> invokeInTransactionRouter(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<CommandInvocation> invocation) {
+ auto opCtx = rec->getOpCtx();
auto txnRouter = TransactionRouter::get(opCtx);
invariant(txnRouter);
// No-op if the transaction is not running with snapshot read concern.
txnRouter.setDefaultAtClusterTime(opCtx);
- try {
- CommandHelpers::runCommandInvocation(opCtx, request, invocation, result);
- } catch (const DBException& e) {
- if (ErrorCodes::isSnapshotError(e.code()) ||
- ErrorCodes::isNeedRetargettingError(e.code()) ||
- e.code() == ErrorCodes::ShardInvalidatedForTargeting ||
- e.code() == ErrorCodes::StaleDbVersion) {
- // Don't abort on possibly retryable errors.
- throw;
- }
+ return CommandHelpers::runCommandInvocationAsync(rec, std::move(invocation))
+ .tapError([rec = std::move(rec)](Status status) {
+ if (auto code = status.code(); ErrorCodes::isSnapshotError(code) ||
+ ErrorCodes::isNeedRetargettingError(code) ||
+ code == ErrorCodes::ShardInvalidatedForTargeting ||
+ code == ErrorCodes::StaleDbVersion) {
+ // Don't abort on possibly retryable errors.
+ return;
+ }
- txnRouter.implicitlyAbortTransaction(opCtx, e.toStatus());
- throw;
- }
+ auto opCtx = rec->getOpCtx();
+ TransactionRouter::get(opCtx).implicitlyAbortTransaction(opCtx, status);
+ });
}
/**
@@ -187,139 +185,314 @@ void invokeInTransactionRouter(OperationContext* opCtx,
*/
void addContextForTransactionAbortingError(StringData txnIdAsString,
StmtId latestStmtId,
- DBException& ex,
+ Status& status,
StringData reason) {
- ex.addContext(str::stream() << "Transaction " << txnIdAsString << " was aborted on statement "
- << latestStmtId << " due to: " << reason);
+ status.addContext("Transaction {} was aborted on statement {} due to: {}"_format(
+ txnIdAsString, latestStmtId, reason));
}
-void execCommandClient(OperationContext* opCtx,
- CommandInvocation* invocation,
- const OpMsgRequest& request,
- rpc::ReplyBuilderInterface* result) {
- [&] {
- const Command* c = invocation->definition();
-
- const auto dbname = request.getDatabase();
- uassert(ErrorCodes::IllegalOperation,
- "Can't use 'local' database through mongos",
- dbname != NamespaceString::kLocalDb);
- uassert(
- ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid database name: '" << dbname << "'",
- NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
+// Factory class to construct a future-chain that executes the invocation against the database.
+class ExecCommandClient final : public std::enable_shared_from_this<ExecCommandClient> {
+public:
+ ExecCommandClient(ExecCommandClient&&) = delete;
+ ExecCommandClient(const ExecCommandClient&) = delete;
- StringMap<int> topLevelFields;
- for (auto&& element : request.body) {
- StringData fieldName = element.fieldNameStringData();
- if (fieldName == "help" && element.type() == Bool && element.Bool()) {
- std::stringstream help;
- help << "help for: " << c->getName() << " " << c->help();
- auto body = result->getBodyBuilder();
- body.append("help", help.str());
- CommandHelpers::appendSimpleCommandStatus(body, true, "");
- return;
- }
+ ExecCommandClient(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<CommandInvocation> invocation)
+ : _rec(std::move(rec)), _invocation(std::move(invocation)) {}
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Parsed command object contains duplicate top level key: "
- << fieldName,
- topLevelFields[fieldName]++ == 0);
- }
+ Future<void> run();
- try {
- invocation->checkAuthorization(opCtx, request);
- } catch (const DBException& e) {
+private:
+ // Prepare the environment for running the invocation (e.g., checking authorization).
+ Status _prologue();
+
+ // Returns a future that runs the command invocation.
+ Future<void> _run();
+
+ // Any logic that must be done post command execution, unless an exception is thrown.
+ void _epilogue();
+
+ // Runs at the end of the future-chain returned by `run()` unless an exception, other than
+ // `ErrorCodes::SkipCommandExecution`, is thrown earlier.
+ void _onCompletion();
+
+ const std::shared_ptr<RequestExecutionContext> _rec;
+ const std::shared_ptr<CommandInvocation> _invocation;
+};
+
+Status ExecCommandClient::_prologue() {
+ auto opCtx = _rec->getOpCtx();
+ auto result = _rec->getReplyBuilder();
+ const auto& request = _rec->getRequest();
+ const Command* c = _invocation->definition();
+
+ const auto dbname = request.getDatabase();
+ uassert(ErrorCodes::IllegalOperation,
+ "Can't use 'local' database through mongos",
+ dbname != NamespaceString::kLocalDb);
+ uassert(ErrorCodes::InvalidNamespace,
+ "Invalid database name: '{}'"_format(dbname),
+ NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
+
+ StringMap<int> topLevelFields;
+ for (auto&& element : request.body) {
+ StringData fieldName = element.fieldNameStringData();
+ if (fieldName == "help" && element.type() == Bool && element.Bool()) {
auto body = result->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(body, e.toStatus());
- return;
+ body.append("help", "help for: {} {}"_format(c->getName(), c->help()));
+ CommandHelpers::appendSimpleCommandStatus(body, true, "");
+ return {ErrorCodes::SkipCommandExecution, "Already served help command"};
}
- // attach tracking
- rpc::TrackingMetadata trackingMetadata;
- trackingMetadata.initWithOperName(c->getName());
- rpc::TrackingMetadata::get(opCtx) = trackingMetadata;
+ uassert(ErrorCodes::FailedToParse,
+ "Parsed command object contains duplicate top level key: {}"_format(fieldName),
+ topLevelFields[fieldName]++ == 0);
+ }
- // Extract and process metadata from the command request body.
- ReadPreferenceSetting::get(opCtx) =
- uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.body));
- VectorClock::get(opCtx)->gossipIn(opCtx, request.body, !c->requiresAuth());
+ try {
+ _invocation->checkAuthorization(opCtx, request);
+ } catch (const DBException& e) {
+ auto body = result->getBodyBuilder();
+ CommandHelpers::appendCommandStatusNoThrow(body, e.toStatus());
+ return {ErrorCodes::SkipCommandExecution, "Failed to check authorization"};
+ }
- auto txnRouter = TransactionRouter::get(opCtx);
- if (txnRouter) {
- invokeInTransactionRouter(opCtx, request, invocation, result);
- } else {
- CommandHelpers::runCommandInvocation(opCtx, request, invocation, result);
- }
+ // attach tracking
+ rpc::TrackingMetadata trackingMetadata;
+ trackingMetadata.initWithOperName(c->getName());
+ rpc::TrackingMetadata::get(opCtx) = trackingMetadata;
+
+ // Extract and process metadata from the command request body.
+ ReadPreferenceSetting::get(opCtx) =
+ uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.body));
+ VectorClock::get(opCtx)->gossipIn(opCtx, request.body, !c->requiresAuth());
+
+ return Status::OK();
+}
- if (invocation->supportsWriteConcern()) {
- failCommand.executeIf(
- [&](const BSONObj& data) {
- result->getBodyBuilder().append(data["writeConcernError"]);
- if (data.hasField(kErrorLabelsFieldName) &&
- data[kErrorLabelsFieldName].type() == Array) {
- auto labels = data.getObjectField(kErrorLabelsFieldName).getOwned();
- if (!labels.isEmpty()) {
- result->getBodyBuilder().append(kErrorLabelsFieldName,
- BSONArray(labels));
- }
+Future<void> ExecCommandClient::_run() {
+ OperationContext* opCtx = _rec->getOpCtx();
+ if (auto txnRouter = TransactionRouter::get(opCtx); txnRouter) {
+ return invokeInTransactionRouter(_rec, _invocation);
+ } else {
+ return CommandHelpers::runCommandInvocationAsync(_rec, _invocation);
+ }
+}
+
+void ExecCommandClient::_epilogue() {
+ auto opCtx = _rec->getOpCtx();
+ auto result = _rec->getReplyBuilder();
+ if (_invocation->supportsWriteConcern()) {
+ failCommand.executeIf(
+ [&](const BSONObj& data) {
+ result->getBodyBuilder().append(data["writeConcernError"]);
+ if (data.hasField(kErrorLabelsFieldName) &&
+ data[kErrorLabelsFieldName].type() == Array) {
+ auto labels = data.getObjectField(kErrorLabelsFieldName).getOwned();
+ if (!labels.isEmpty()) {
+ result->getBodyBuilder().append(kErrorLabelsFieldName, BSONArray(labels));
}
- },
- [&](const BSONObj& data) {
- return CommandHelpers::shouldActivateFailCommandFailPoint(
- data, invocation, opCtx->getClient()) &&
- data.hasField("writeConcernError");
- });
- }
+ }
+ },
+ [&](const BSONObj& data) {
+ return CommandHelpers::shouldActivateFailCommandFailPoint(
+ data, _invocation.get(), opCtx->getClient()) &&
+ data.hasField("writeConcernError");
+ });
+ }
- auto body = result->getBodyBuilder();
+ auto body = result->getBodyBuilder();
- bool ok = CommandHelpers::extractOrAppendOk(body);
- if (!ok) {
- c->incrementCommandsFailed();
+ if (bool ok = CommandHelpers::extractOrAppendOk(body); !ok) {
+ const Command* c = _invocation->definition();
+ c->incrementCommandsFailed();
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter.implicitlyAbortTransaction(opCtx,
- getStatusFromCommandResult(body.asTempObj()));
- }
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter.implicitlyAbortTransaction(opCtx,
+ getStatusFromCommandResult(body.asTempObj()));
}
- }();
+ }
+}
- auto body = result->getBodyBuilder();
+void ExecCommandClient::_onCompletion() {
+ auto opCtx = _rec->getOpCtx();
+ auto body = _rec->getReplyBuilder()->getBodyBuilder();
appendRequiredFieldsToResponse(opCtx, &body);
+
+ // TODO SERVER-49109 enable the following code-path.
+ /*
+ auto seCtx = transport::ServiceExecutorContext::get(opCtx->getClient());
+ if (!seCtx) {
+ // We were run by a background worker.
+ return;
+ }
+
+ if (!_invocation->isSafeForBorrowedThreads()) {
+ // If the last command wasn't safe for a borrowed thread, then let's move off of it.
+ seCtx->setThreadingModel(transport::ServiceExecutor::ThreadingModel::kDedicated);
+ }
+ */
+}
+
+Future<void> ExecCommandClient::run() {
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { return _prologue(); })
+ .then([this, anchor = shared_from_this()] { return _run(); })
+ .then([this, anchor = shared_from_this()] { _epilogue(); })
+ .onCompletion([this, anchor = shared_from_this()](Status status) {
+ if (!status.isOK() && status.code() != ErrorCodes::SkipCommandExecution)
+ return status; // Execution was interrupted due to an error.
+
+ _onCompletion();
+ return Status::OK();
+ });
+ pf.promise.emplaceValue();
+ return future;
}
MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError);
/**
- * Executes the command for the given request, and appends the result to replyBuilder
- * and error labels, if any, to errorBuilder.
+ * Produces a future-chain that parses the command, runs the parsed command, and captures the result
+ * in replyBuilder.
*/
-void runCommand(OperationContext* opCtx,
- const OpMsgRequest& request,
- const Message& m,
- rpc::ReplyBuilderInterface* replyBuilder,
- BSONObjBuilder* errorBuilder) {
- auto const opType = m.operation();
- auto const commandName = request.getCommandName();
- auto const command = CommandHelpers::findCommand(commandName);
+class ParseAndRunCommand final : public std::enable_shared_from_this<ParseAndRunCommand> {
+public:
+ ParseAndRunCommand(const ParseAndRunCommand&) = delete;
+ ParseAndRunCommand(ParseAndRunCommand&&) = delete;
+
+ ParseAndRunCommand(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<BSONObjBuilder> errorBuilder)
+ : _rec(std::move(rec)),
+ _errorBuilder(std::move(errorBuilder)),
+ _opType(_rec->getMessage().operation()),
+ _commandName(_rec->getRequest().getCommandName()) {}
+
+ Future<void> run();
+
+private:
+ class RunInvocation;
+ class RunAndRetry;
+
+ // Prepares the environment for running the command (e.g., parsing the command to produce the
+ // invocation and extracting read/write concerns).
+ Status _prologue();
+
+ // Returns a future-chain that runs the parse invocation.
+ Future<void> _runInvocation();
+
+ const std::shared_ptr<RequestExecutionContext> _rec;
+ const std::shared_ptr<BSONObjBuilder> _errorBuilder;
+ const NetworkOp _opType;
+ const StringData _commandName;
+
+ std::shared_ptr<CommandInvocation> _invocation;
+ boost::optional<std::string> _ns;
+ boost::optional<OperationSessionInfoFromClient> _osi;
+ boost::optional<WriteConcernOptions> _wc;
+ boost::optional<bool> _isHello;
+};
+
+/*
+ * Produces a future-chain to run the invocation and capture the result in replyBuilder.
+ */
+class ParseAndRunCommand::RunInvocation final
+ : public std::enable_shared_from_this<ParseAndRunCommand::RunInvocation> {
+public:
+ RunInvocation(RunInvocation&&) = delete;
+ RunInvocation(const RunInvocation&) = delete;
+
+ explicit RunInvocation(std::shared_ptr<ParseAndRunCommand> parc) : _parc(std::move(parc)) {}
+
+ ~RunInvocation() {
+ if (!_shouldAffectCommandCounter)
+ return;
+ auto opCtx = _parc->_rec->getOpCtx();
+ Grid::get(opCtx)->catalogCache()->checkAndRecordOperationBlockedByRefresh(
+ opCtx, mongo::LogicalOp::opCommand);
+ }
+
+ Future<void> run();
+
+private:
+ Status _setup();
+
+ // Returns a future-chain that runs the invocation and retries if necessary.
+ Future<void> _runAndRetry();
+
+ // Logs and updates statistics if an error occurs during `_setup()` or `_runAndRetry()`.
+ void _tapOnError(const Status& status);
+
+ const std::shared_ptr<ParseAndRunCommand> _parc;
+
+ boost::optional<RouterOperationContextSession> _routerSession;
+ bool _shouldAffectCommandCounter = false;
+};
+
+/*
+ * Produces a future-chain that runs the invocation and retries if necessary.
+ */
+class ParseAndRunCommand::RunAndRetry final
+ : public std::enable_shared_from_this<ParseAndRunCommand::RunAndRetry> {
+public:
+ RunAndRetry(RunAndRetry&&) = delete;
+ RunAndRetry(const RunAndRetry&) = delete;
+
+ explicit RunAndRetry(std::shared_ptr<ParseAndRunCommand> parc) : _parc(std::move(parc)) {}
+
+ Future<void> run();
+
+private:
+ bool _canRetry() const {
+ return _tries < kMaxNumStaleVersionRetries;
+ }
+
+ // Sets up the environment for running the invocation, and clears the state from the last try.
+ void _setup();
+
+ Future<void> _run();
+
+ // Exception handler for error codes that may trigger a retry. All methods will throw `status`
+ // unless an attempt to retry is possible.
+ void _checkRetryForTransaction(Status& status);
+ void _onShardInvalidatedForTargeting(Status& status);
+ void _onNeedRetargetting(Status& status);
+ void _onStaleDbVersion(Status& status);
+ void _onSnapshotError(Status& status);
+
+ const std::shared_ptr<ParseAndRunCommand> _parc;
+
+ int _tries = 0;
+};
+
+Status ParseAndRunCommand::_prologue() {
+ auto opCtx = _rec->getOpCtx();
+ const auto& m = _rec->getMessage();
+ const auto& request = _rec->getRequest();
+ auto replyBuilder = _rec->getReplyBuilder();
+
+ auto const command = CommandHelpers::findCommand(_commandName);
if (!command) {
+ const std::string errorMsg = "no such cmd: {}"_format(_commandName);
auto builder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- builder,
- {ErrorCodes::CommandNotFound, str::stream() << "no such cmd: " << commandName});
+ CommandHelpers::appendCommandStatusNoThrow(builder,
+ {ErrorCodes::CommandNotFound, errorMsg});
globalCommandRegistry()->incrementUnknownCommands();
appendRequiredFieldsToResponse(opCtx, &builder);
- return;
+ return {ErrorCodes::SkipCommandExecution, errorMsg};
}
- const auto isHello = command->getName() == "hello"_sd || command->getName() == "isMaster"_sd;
+ _rec->setCommand(command);
+
+ _isHello.emplace(command->getName() == "hello"_sd || command->getName() == "isMaster"_sd);
opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported));
const auto session = opCtx->getClient()->session();
if (session) {
- if (!opCtx->isExhaust() || !isHello) {
- InExhaustHello::get(session.get())->setInExhaust(false, commandName);
+ if (!opCtx->isExhaust() || !_isHello.get()) {
+ InExhaustHello::get(session.get())->setInExhaust(false, _commandName);
}
}
@@ -348,30 +521,31 @@ void runCommand(OperationContext* opCtx,
opCtx->setComment(commentField.wrap());
}
- std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request);
- CommandInvocation::set(opCtx, invocation);
+ _invocation = command->parse(opCtx, request);
+ CommandInvocation::set(opCtx, _invocation);
// Set the logical optype, command object and namespace as soon as we identify the command. If
// the command does not define a fully-qualified namespace, set CurOp to the generic command
// namespace db.$cmd.
- std::string ns = invocation->ns().toString();
- auto nss = (request.getDatabase() == ns ? NamespaceString(ns, "$cmd") : NamespaceString(ns));
+ _ns.emplace(_invocation->ns().toString());
+ auto nss =
+ (request.getDatabase() == *_ns ? NamespaceString(*_ns, "$cmd") : NamespaceString(*_ns));
// Fill out all currentOp details.
- CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, opType);
+ CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, _opType);
- auto osi = initializeOperationSessionInfo(opCtx,
- request.body,
- command->requiresAuth(),
- command->attachLogicalSessionsToOpCtx(),
- true);
+ _osi.emplace(initializeOperationSessionInfo(opCtx,
+ request.body,
+ command->requiresAuth(),
+ command->attachLogicalSessionsToOpCtx(),
+ true));
// TODO SERVER-28756: Change allowTransactionsOnConfigDatabase to true once we fix the bug
// where the mongos custom write path incorrectly drops the client's txnNumber.
auto allowTransactionsOnConfigDatabase = false;
- validateSessionOptions(osi, command->getName(), nss, allowTransactionsOnConfigDatabase);
+ validateSessionOptions(*_osi, command->getName(), nss, allowTransactionsOnConfigDatabase);
- auto wc = uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body));
+ _wc.emplace(uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body)));
Client* client = opCtx->getClient();
auto const apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command);
@@ -389,519 +563,511 @@ void runCommand(OperationContext* opCtx,
if (!readConcernParseStatus.isOK()) {
auto builder = replyBuilder->getBodyBuilder();
CommandHelpers::appendCommandStatusNoThrow(builder, readConcernParseStatus);
- return;
+ return {ErrorCodes::SkipCommandExecution, "Failed to parse read concern"};
}
- try {
- if (isHello) {
- // Preload generic ClientMetadata ahead of our first hello request. After the first
- // request, metaElement should always be empty.
- auto metaElem = request.body[kMetadataDocumentName];
- ClientMetadata::setFromMetadata(opCtx->getClient(), metaElem);
- }
-
- auto& apiParams = APIParameters::get(opCtx);
- auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext());
- if (auto clientMetadata = ClientMetadata::get(client)) {
- auto appName = clientMetadata->getApplicationName().toString();
- apiVersionMetrics.update(appName, apiParams);
- }
-
- rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
-
- CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get());
-
- boost::optional<RouterOperationContextSession> routerSession;
- bool startTransaction = false;
- if (osi.getAutocommit()) {
- routerSession.emplace(opCtx);
-
- auto txnRouter = TransactionRouter::get(opCtx);
- invariant(txnRouter);
+ return Status::OK();
+}
- auto txnNumber = opCtx->getTxnNumber();
- invariant(txnNumber);
+Status ParseAndRunCommand::RunInvocation::_setup() {
+ auto invocation = _parc->_invocation;
+ auto opCtx = _parc->_rec->getOpCtx();
+ auto command = _parc->_rec->getCommand();
+ const auto& request = _parc->_rec->getRequest();
+ auto replyBuilder = _parc->_rec->getReplyBuilder();
+
+ auto appendStatusToReplyAndSkipCommandExecution = [replyBuilder](Status status) -> Status {
+ auto responseBuilder = replyBuilder->getBodyBuilder();
+ CommandHelpers::appendCommandStatusNoThrow(responseBuilder, status);
+ return Status(ErrorCodes::SkipCommandExecution, status.reason());
+ };
+
+ if (_parc->_isHello.get()) {
+ // Preload generic ClientMetadata ahead of our first hello request. After the first
+ // request, metaElement should always be empty.
+ auto metaElem = request.body[kMetadataDocumentName];
+ ClientMetadata::setFromMetadata(opCtx->getClient(), metaElem);
+ }
- auto transactionAction = ([&] {
- auto startTxnSetting = osi.getStartTransaction();
- if (startTxnSetting && *startTxnSetting) {
- return TransactionRouter::TransactionActions::kStart;
- }
+ auto& apiParams = APIParameters::get(opCtx);
+ auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext());
+ if (auto clientMetadata = ClientMetadata::get(opCtx->getClient())) {
+ auto appName = clientMetadata->getApplicationName().toString();
+ apiVersionMetrics.update(appName, apiParams);
+ }
- if (command->getName() == CommitTransaction::kCommandName) {
- return TransactionRouter::TransactionActions::kCommit;
- }
+ rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
- return TransactionRouter::TransactionActions::kContinue;
- })();
+ CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get());
+ bool startTransaction = false;
+ if (_parc->_osi->getAutocommit()) {
+ _routerSession.emplace(opCtx);
- startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart);
- txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction);
- }
+ auto txnRouter = TransactionRouter::get(opCtx);
+ invariant(txnRouter);
- bool supportsWriteConcern = invocation->supportsWriteConcern();
- if (!supportsWriteConcern &&
- request.body.hasField(WriteConcernOptions::kWriteConcernField)) {
- // This command doesn't do writes so it should not be passed a writeConcern.
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern"));
- return;
- }
+ auto txnNumber = opCtx->getTxnNumber();
+ invariant(txnNumber);
- bool clientSuppliedWriteConcern = !wc.usedDefault;
- bool customDefaultWriteConcernWasApplied = false;
-
- if (supportsWriteConcern && !clientSuppliedWriteConcern &&
- (!TransactionRouter::get(opCtx) || isTransactionCommand(commandName))) {
- // This command supports WC, but wasn't given one - so apply the default, if there is
- // one.
- if (const auto wcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext())
- .getDefaultWriteConcern(opCtx)) {
- wc = *wcDefault;
- customDefaultWriteConcernWasApplied = true;
- LOGV2_DEBUG(22766,
- 2,
- "Applying default writeConcern on {command} of {writeConcern}",
- "Applying default writeConcern on command",
- "command"_attr = request.getCommandName(),
- "writeConcern"_attr = *wcDefault);
+ auto transactionAction = ([&] {
+ auto startTxnSetting = _parc->_osi->getStartTransaction();
+ if (startTxnSetting && *startTxnSetting) {
+ return TransactionRouter::TransactionActions::kStart;
}
- }
- if (TransactionRouter::get(opCtx)) {
- validateWriteConcernForTransaction(wc, commandName);
- }
-
- if (supportsWriteConcern) {
- auto& provenance = wc.getProvenance();
-
- // ClientSupplied is the only provenance that clients are allowed to pass to mongos.
- if (provenance.hasSource() && !provenance.isClientSupplied()) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- Status{ErrorCodes::InvalidOptions,
- "writeConcern provenance must be unset or \"{}\""_format(
- ReadWriteConcernProvenance::kClientSupplied)});
- return;
+ if (command->getName() == CommitTransaction::kCommandName) {
+ return TransactionRouter::TransactionActions::kCommit;
}
- // If the client didn't provide a provenance, then an appropriate value needs to be
- // determined.
- if (!provenance.hasSource()) {
- if (clientSuppliedWriteConcern) {
- provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied);
- } else if (customDefaultWriteConcernWasApplied) {
- provenance.setSource(ReadWriteConcernProvenance::Source::customDefault);
- } else {
- provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault);
- }
- }
+ return TransactionRouter::TransactionActions::kContinue;
+ })();
- // Ensure that the WC being set on the opCtx has provenance.
- invariant(wc.getProvenance().hasSource(),
- str::stream()
- << "unexpected unset provenance on writeConcern: " << wc.toBSON());
+ startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction);
+ }
- opCtx->setWriteConcern(wc);
- }
+ bool supportsWriteConcern = invocation->supportsWriteConcern();
+ if (!supportsWriteConcern && request.body.hasField(WriteConcernOptions::kWriteConcernField)) {
+ // This command doesn't do writes so it should not be passed a writeConcern.
+ const auto errorMsg = "Command does not support writeConcern";
+ return appendStatusToReplyAndSkipCommandExecution({ErrorCodes::InvalidOptions, errorMsg});
+ }
- bool clientSuppliedReadConcern = readConcernArgs.isSpecified();
- bool customDefaultReadConcernWasApplied = false;
-
- auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
- if (readConcernSupport.defaultReadConcernPermit.isOK() &&
- (startTransaction || !TransactionRouter::get(opCtx))) {
- if (readConcernArgs.isEmpty()) {
- const auto rcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext())
- .getDefaultReadConcern(opCtx);
- if (rcDefault) {
- {
- // We must obtain the client lock to set ReadConcernArgs, because it's an
- // in-place reference to the object on the operation context, which may be
- // concurrently used elsewhere (eg. read by currentOp).
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- readConcernArgs = std::move(*rcDefault);
- }
- customDefaultReadConcernWasApplied = true;
- LOGV2_DEBUG(22767,
- 2,
- "Applying default readConcern on {command} of {readConcern}",
- "Applying default readConcern on command",
- "command"_attr = invocation->definition()->getName(),
- "readConcern"_attr = *rcDefault);
- // Update the readConcernSupport, since the default RC was applied.
- readConcernSupport =
- invocation->supportsReadConcern(readConcernArgs.getLevel());
- }
- }
+ bool clientSuppliedWriteConcern = !_parc->_wc->usedDefault;
+ bool customDefaultWriteConcernWasApplied = false;
+
+ if (supportsWriteConcern && !clientSuppliedWriteConcern &&
+ (!TransactionRouter::get(opCtx) || isTransactionCommand(_parc->_commandName))) {
+ // This command supports WC, but wasn't given one - so apply the default, if there is one.
+ if (const auto wcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext())
+ .getDefaultWriteConcern(opCtx)) {
+ _parc->_wc = *wcDefault;
+ customDefaultWriteConcernWasApplied = true;
+ LOGV2_DEBUG(22766,
+ 2,
+ "Applying default writeConcern on {command} of {writeConcern}",
+ "Applying default writeConcern on command",
+ "command"_attr = request.getCommandName(),
+ "writeConcern"_attr = *wcDefault);
}
+ }
+
+ if (TransactionRouter::get(opCtx)) {
+ validateWriteConcernForTransaction(*_parc->_wc, _parc->_commandName);
+ }
- auto& provenance = readConcernArgs.getProvenance();
+ if (supportsWriteConcern) {
+ auto& provenance = _parc->_wc->getProvenance();
// ClientSupplied is the only provenance that clients are allowed to pass to mongos.
if (provenance.hasSource() && !provenance.isClientSupplied()) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- Status{ErrorCodes::InvalidOptions,
- "readConcern provenance must be unset or \"{}\""_format(
- ReadWriteConcernProvenance::kClientSupplied)});
- return;
+ const auto errorMsg = "writeConcern provenance must be unset or \"{}\""_format(
+ ReadWriteConcernProvenance::kClientSupplied);
+ return appendStatusToReplyAndSkipCommandExecution(
+ {ErrorCodes::InvalidOptions, errorMsg});
}
// If the client didn't provide a provenance, then an appropriate value needs to be
// determined.
if (!provenance.hasSource()) {
- // We must obtain the client lock to set the provenance of the opCtx's ReadConcernArgs
- // as it may be concurrently read by CurrentOp.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- if (clientSuppliedReadConcern) {
+ if (clientSuppliedWriteConcern) {
provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied);
- } else if (customDefaultReadConcernWasApplied) {
+ } else if (customDefaultWriteConcernWasApplied) {
provenance.setSource(ReadWriteConcernProvenance::Source::customDefault);
} else {
provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault);
}
}
- // Ensure that the RC on the opCtx has provenance.
- invariant(readConcernArgs.getProvenance().hasSource(),
- str::stream() << "unexpected unset provenance on readConcern: "
- << readConcernArgs.toBSONInner());
-
- // If we are starting a transaction, we only need to check whether the read concern is
- // appropriate for running a transaction. There is no need to check whether the specific
- // command supports the read concern, because all commands that are allowed to run in a
- // transaction must support all applicable read concerns.
- if (startTransaction) {
- if (!isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- {ErrorCodes::InvalidOptions,
- "The readConcern level must be either 'local' (default), 'majority' or "
- "'snapshot' in order to run in a transaction"});
- return;
- }
- if (readConcernArgs.getArgsOpTime()) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- {ErrorCodes::InvalidOptions,
- str::stream()
- << "The readConcern cannot specify '"
- << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction"});
- return;
- }
- }
+ // Ensure that the WC being set on the opCtx has provenance.
+ invariant(_parc->_wc->getProvenance().hasSource(),
+ "unexpected unset provenance on writeConcern: {}"_format(
+ _parc->_wc->toBSON().toString()));
- // Otherwise, if there is a read concern present - either user-specified or the default -
- // then check whether the command supports it. If there is no explicit read concern level,
- // then it is implicitly "local". There is no need to check whether this is supported,
- // because all commands either support "local" or upconvert the absent readConcern to a
- // stronger level that they do support; e.g. $changeStream upconverts to RC "majority".
- //
- // Individual transaction statements are checked later on, after we've unstashed the
- // transaction resources.
- if (!TransactionRouter::get(opCtx) && readConcernArgs.hasLevel()) {
- if (!readConcernSupport.readConcernSupport.isOK()) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- readConcernSupport.readConcernSupport.withContext(
- str::stream() << "Command " << invocation->definition()->getName()
- << " does not support " << readConcernArgs.toString()));
- return;
+ opCtx->setWriteConcern(*_parc->_wc);
+ }
+
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ bool clientSuppliedReadConcern = readConcernArgs.isSpecified();
+ bool customDefaultReadConcernWasApplied = false;
+
+ auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
+ if (readConcernSupport.defaultReadConcernPermit.isOK() &&
+ (startTransaction || !TransactionRouter::get(opCtx))) {
+ if (readConcernArgs.isEmpty()) {
+ const auto rcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext())
+ .getDefaultReadConcern(opCtx);
+ if (rcDefault) {
+ {
+ // We must obtain the client lock to set ReadConcernArgs, because it's an
+ // in-place reference to the object on the operation context, which may be
+ // concurrently used elsewhere (eg. read by currentOp).
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ readConcernArgs = std::move(*rcDefault);
+ }
+ customDefaultReadConcernWasApplied = true;
+ LOGV2_DEBUG(22767,
+ 2,
+ "Applying default readConcern on {command} of {readConcern}",
+ "Applying default readConcern on command",
+ "command"_attr = invocation->definition()->getName(),
+ "readConcern"_attr = *rcDefault);
+ // Update the readConcernSupport, since the default RC was applied.
+ readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
}
}
+ }
- // Remember whether or not this operation is starting a transaction, in case something later
- // in the execution needs to adjust its behavior based on this.
- opCtx->setIsStartingMultiDocumentTransaction(startTransaction);
+ auto& provenance = readConcernArgs.getProvenance();
- command->incrementCommandsExecuted();
+ // ClientSupplied is the only provenance that clients are allowed to pass to mongos.
+ if (provenance.hasSource() && !provenance.isClientSupplied()) {
+ const auto errorMsg = "readConcern provenance must be unset or \"{}\""_format(
+ ReadWriteConcernProvenance::kClientSupplied);
+ return appendStatusToReplyAndSkipCommandExecution({ErrorCodes::InvalidOptions, errorMsg});
+ }
+
+ // If the client didn't provide a provenance, then an appropriate value needs to be determined.
+ if (!provenance.hasSource()) {
+ // We must obtain the client lock to set the provenance of the opCtx's ReadConcernArgs as it
+ // may be concurrently read by CurrentOp.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ if (clientSuppliedReadConcern) {
+ provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied);
+ } else if (customDefaultReadConcernWasApplied) {
+ provenance.setSource(ReadWriteConcernProvenance::Source::customDefault);
+ } else {
+ provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault);
+ }
+ }
- auto shouldAffectCommandCounter = command->shouldAffectCommandCounter();
+ // Ensure that the RC on the opCtx has provenance.
+ invariant(readConcernArgs.getProvenance().hasSource(),
+ "unexpected unset provenance on readConcern: {}"_format(
+ readConcernArgs.toBSONInner().toString()));
+
+ // If we are starting a transaction, we only need to check whether the read concern is
+ // appropriate for running a transaction. There is no need to check whether the specific command
+ // supports the read concern, because all commands that are allowed to run in a transaction must
+ // support all applicable read concerns.
+ if (startTransaction) {
+ if (!isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())) {
+ const auto errorMsg =
+ "The readConcern level must be either 'local' (default), 'majority' or "
+ "'snapshot' in order to run in a transaction";
+ return appendStatusToReplyAndSkipCommandExecution(
+ {ErrorCodes::InvalidOptions, errorMsg});
+ }
+ if (readConcernArgs.getArgsOpTime()) {
+ const std::string errorMsg =
+ "The readConcern cannot specify '{}' in a transaction"_format(
+ repl::ReadConcernArgs::kAfterOpTimeFieldName);
+ return appendStatusToReplyAndSkipCommandExecution(
+ {ErrorCodes::InvalidOptions, errorMsg});
+ }
+ }
- if (shouldAffectCommandCounter) {
- globalOpCounters.gotCommand();
+ // Otherwise, if there is a read concern present - either user-specified or the default - then
+ // check whether the command supports it. If there is no explicit read concern level, then it is
+ // implicitly "local". There is no need to check whether this is supported, because all commands
+ // either support "local" or upconvert the absent readConcern to a stronger level that they do
+ // support; e.g. $changeStream upconverts to RC "majority".
+ //
+ // Individual transaction statements are checked later on, after we've unstashed the transaction
+ // resources.
+ if (!TransactionRouter::get(opCtx) && readConcernArgs.hasLevel()) {
+ if (!readConcernSupport.readConcernSupport.isOK()) {
+ const std::string errorMsg = "Command {} does not support {}"_format(
+ invocation->definition()->getName(), readConcernArgs.toString());
+ return appendStatusToReplyAndSkipCommandExecution(
+ readConcernSupport.readConcernSupport.withContext(errorMsg));
}
+ }
- ON_BLOCK_EXIT([opCtx, shouldAffectCommandCounter] {
- if (shouldAffectCommandCounter) {
- Grid::get(opCtx)->catalogCache()->checkAndRecordOperationBlockedByRefresh(
- opCtx, mongo::LogicalOp::opCommand);
- }
- });
+ // Remember whether or not this operation is starting a transaction, in case something later in
+ // the execution needs to adjust its behavior based on this.
+ opCtx->setIsStartingMultiDocumentTransaction(startTransaction);
+ command->incrementCommandsExecuted();
- for (int tries = 0;; ++tries) {
- // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown.
- bool canRetry = tries < kMaxNumStaleVersionRetries - 1;
+ if (command->shouldAffectCommandCounter()) {
+ globalOpCounters.gotCommand();
+ _shouldAffectCommandCounter = true;
+ }
- if (tries > 0) {
- // Re-parse before retrying in case the process of run()-ning the
- // invocation could affect the parsed result.
- invocation = command->parse(opCtx, request);
- invariant(invocation->ns().toString() == ns,
- "unexpected change of namespace when retrying");
- }
+ return Status::OK();
+}
- // On each try, select the latest known clusterTime as the atClusterTime for snapshot
- // reads outside of transactions.
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
- !TransactionRouter::get(opCtx) &&
- (!readConcernArgs.getArgsAtClusterTime() ||
- readConcernArgs.wasAtClusterTimeSelected())) {
- auto atClusterTime = [&] {
- const auto latestKnownTime = VectorClock::get(opCtx)->getTime();
- // Choose a time after the user-supplied afterClusterTime.
- auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime();
- if (afterClusterTime && *afterClusterTime > latestKnownTime.clusterTime()) {
- return afterClusterTime->asTimestamp();
- }
- return latestKnownTime.clusterTime().asTimestamp();
- }();
- readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime);
+void ParseAndRunCommand::RunAndRetry::_setup() {
+ auto opCtx = _parc->_rec->getOpCtx();
+ const auto command = _parc->_rec->getCommand();
+ const auto& request = _parc->_rec->getRequest();
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+
+ if (_tries > 1) {
+ // Re-parse before retrying in case the process of run()-ning the invocation could affect
+ // the parsed result.
+ _parc->_invocation = command->parse(opCtx, request);
+ invariant(_parc->_invocation->ns().toString() == _parc->_ns,
+ "unexpected change of namespace when retrying");
+ }
+
+ // On each try, select the latest known clusterTime as the atClusterTime for snapshot reads
+ // outside of transactions.
+ if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
+ !TransactionRouter::get(opCtx) &&
+ (!readConcernArgs.getArgsAtClusterTime() || readConcernArgs.wasAtClusterTimeSelected())) {
+ auto atClusterTime = [](OperationContext* opCtx, ReadConcernArgs& readConcernArgs) {
+ const auto latestKnownTime = VectorClock::get(opCtx)->getTime();
+ // Choose a time after the user-supplied afterClusterTime.
+ auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime();
+ if (afterClusterTime && *afterClusterTime > latestKnownTime.clusterTime()) {
+ return afterClusterTime->asTimestamp();
}
+ return latestKnownTime.clusterTime().asTimestamp();
+ }(opCtx, readConcernArgs);
+ readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime);
+ }
- replyBuilder->reset();
- try {
- execCommandClient(opCtx, invocation.get(), request, replyBuilder);
+ _parc->_rec->getReplyBuilder()->reset();
+}
- auto responseBuilder = replyBuilder->getBodyBuilder();
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter.appendRecoveryToken(&responseBuilder);
- }
+Future<void> ParseAndRunCommand::RunAndRetry::_run() {
+ auto ecc = std::make_shared<ExecCommandClient>(_parc->_rec, _parc->_invocation);
+ return ecc->run().then([rec = _parc->_rec] {
+ auto opCtx = rec->getOpCtx();
+ auto responseBuilder = rec->getReplyBuilder()->getBodyBuilder();
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter.appendRecoveryToken(&responseBuilder);
+ }
+ });
+}
- return;
- } catch (ShardInvalidatedForTargetingException& ex) {
- auto catalogCache = Grid::get(opCtx)->catalogCache();
- catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
-
- // Retry logic specific to transactions. Throws and aborts the transaction if the
- // error cannot be retried on.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- auto abortGuard = makeGuard(
- [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
-
- if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "exhausted retries");
- throw;
- }
+void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status) {
+ // Retry logic specific to transactions. Throws and aborts the transaction if the error cannot
+ // be retried on.
+ auto opCtx = _parc->_rec->getOpCtx();
+ auto txnRouter = TransactionRouter::get(opCtx);
+ if (!txnRouter)
+ return;
- // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot,
- // or shard invalidated for targeting errors.
- if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) {
- (void)catalogCache->getCollectionRoutingInfoWithRefresh(
- opCtx, ex.extraInfo<ShardInvalidatedForTargetingInfo>()->getNss());
- addContextForTransactionAbortingError(
- txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "an error from cluster data placement change");
- throw;
- }
+ auto abortGuard = makeGuard([&] { txnRouter.implicitlyAbortTransaction(opCtx, status); });
- // The error is retryable, so update transaction state before retrying.
- txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
+ if (!_canRetry()) {
+ addContextForTransactionAbortingError(
+ txnRouter.txnIdToString(), txnRouter.getLatestStmtId(), status, "exhausted retries");
+ iassert(status);
+ }
- abortGuard.dismiss();
- continue;
- }
+ // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, or shard
+ // invalidated for targeting errors.
+ if (ErrorCodes::isA<ErrorCategory::SnapshotError>(status)) {
+ if (!txnRouter.canContinueOnSnapshotError()) {
+ addContextForTransactionAbortingError(txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ status,
+ "a non-retryable snapshot error");
+ iassert(status);
+ }
- if (canRetry) {
- continue;
- }
- throw;
- } catch (ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) {
- const auto staleNs = [&] {
- if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
- return staleInfo->getNss();
- }
- throw;
- }();
+ // The error is retryable, so update transaction state before retrying.
+ txnRouter.onSnapshotError(opCtx, status);
+ } else {
+ invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status) ||
+ status.code() == ErrorCodes::ShardInvalidatedForTargeting ||
+ status.code() == ErrorCodes::StaleDbVersion);
+ if (!txnRouter.canContinueOnStaleShardOrDbError(_parc->_commandName, status)) {
+ if (status.code() == ErrorCodes::ShardInvalidatedForTargeting) {
auto catalogCache = Grid::get(opCtx)->catalogCache();
- if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
- catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
- staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId());
- } else {
- // If we don't have the stale config info and therefore don't know the shard's
- // id, we have to force all further targetting requests for the namespace to
- // block on a refresh.
- catalogCache->invalidateCollectionEntry_LINEARIZABLE(staleNs);
- }
+ (void)catalogCache->getCollectionRoutingInfoWithRefresh(
+ opCtx, status.extraInfo<ShardInvalidatedForTargetingInfo>()->getNss());
+ }
+ addContextForTransactionAbortingError(txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ status,
+ "an error from cluster data placement change");
+ iassert(status);
+ }
- catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
+ // The error is retryable, so update transaction state before retrying.
+ txnRouter.onStaleShardOrDbError(opCtx, _parc->_commandName, status);
+ }
- // Retry logic specific to transactions. Throws and aborts the transaction if the
- // error cannot be retried on.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- auto abortGuard = makeGuard(
- [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
+ abortGuard.dismiss();
+}
- if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "exhausted retries");
- throw;
- }
+void ParseAndRunCommand::RunAndRetry::_onShardInvalidatedForTargeting(Status& status) {
+ invariant(status.code() == ErrorCodes::ShardInvalidatedForTargeting);
- // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot,
- // or shard invalidated for targeting errors.
- if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) {
- addContextForTransactionAbortingError(
- txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "an error from cluster data placement change");
- throw;
- }
+ auto opCtx = _parc->_rec->getOpCtx();
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+ catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
- // The error is retryable, so update transaction state before retrying.
- txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
+ _checkRetryForTransaction(status);
- abortGuard.dismiss();
- continue;
- }
+ if (!_canRetry())
+ iassert(status);
+}
- if (canRetry) {
- continue;
- }
- throw;
- } catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) {
- // Mark database entry in cache as stale.
- Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(ex->getDb(),
- ex->getVersionWanted());
-
- // Retry logic specific to transactions. Throws and aborts the transaction if the
- // error cannot be retried on.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- auto abortGuard = makeGuard(
- [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
-
- if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "exhausted retries");
- throw;
- }
+void ParseAndRunCommand::RunAndRetry::_onNeedRetargetting(Status& status) {
+ invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status));
- // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot,
- // or shard invalidated for targeting errors.
- if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) {
- addContextForTransactionAbortingError(
- txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "an error from cluster data placement change");
- throw;
- }
+ auto staleInfo = status.extraInfo<StaleConfigInfo>();
+ if (!staleInfo)
+ iassert(status);
- // The error is retryable, so update transaction state before retrying.
- txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
+ auto opCtx = _parc->_rec->getOpCtx();
+ const auto staleNs = staleInfo->getNss();
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+ catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId());
- abortGuard.dismiss();
- continue;
- }
+ catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
- if (canRetry) {
- continue;
- }
- throw;
- } catch (ExceptionForCat<ErrorCategory::SnapshotError>& ex) {
- // Simple retry on any type of snapshot error.
-
- // Retry logic specific to transactions. Throws and aborts the transaction if the
- // error cannot be retried on.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- auto abortGuard = makeGuard(
- [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
-
- if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "exhausted retries");
- throw;
- }
+ _checkRetryForTransaction(status);
- // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot,
- // or shard invalidated for targeting errors.
- if (!txnRouter.canContinueOnSnapshotError()) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "a non-retryable snapshot error");
- throw;
- }
+ if (!_canRetry())
+ iassert(status);
+}
- // The error is retryable, so update transaction state before retrying.
- txnRouter.onSnapshotError(opCtx, ex.toStatus());
+void ParseAndRunCommand::RunAndRetry::_onStaleDbVersion(Status& status) {
+ invariant(status.code() == ErrorCodes::StaleDbVersion);
+ auto opCtx = _parc->_rec->getOpCtx();
- abortGuard.dismiss();
- continue;
- } else if (!ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) {
- // Non-transaction snapshot read. The client sent readConcern: {level:
- // "snapshot", atClusterTime: T}, where T is older than
- // minSnapshotHistoryWindowInSeconds, retrying won't succeed.
- throw;
- }
+ // Mark database entry in cache as stale.
+ auto extraInfo = status.extraInfo<StaleDbRoutingVersion>();
+ invariant(extraInfo);
+ Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(extraInfo->getDb(),
+ extraInfo->getVersionWanted());
- if (canRetry) {
- continue;
- }
- throw;
- }
- MONGO_UNREACHABLE;
- }
- } catch (const DBException& e) {
- command->incrementCommandsFailed();
- LastError::get(opCtx->getClient()).setLastError(e.code(), e.reason());
- // WriteConcern error (wcCode) is set to boost::none because:
- // 1. TransientTransaction error label handling for commitTransaction command in mongos is
- // delegated to the shards. Mongos simply propagates the shard's response up to the
- // client.
- // 2. For other commands in a transaction, they shouldn't get a writeConcern error so
- // this setting doesn't apply.
- //
- // isInternalClient is set to true to suppress mongos from returning the RetryableWriteError
- // label.
- auto errorLabels = getErrorLabels(
- opCtx, osi, command->getName(), e.code(), boost::none, true /* isInternalClient */);
- errorBuilder->appendElements(errorLabels);
- throw;
+ _checkRetryForTransaction(status);
+
+ if (!_canRetry())
+ iassert(status);
+}
+
+void ParseAndRunCommand::RunAndRetry::_onSnapshotError(Status& status) {
+ // Simple retry on any type of snapshot error.
+ invariant(ErrorCodes::isA<ErrorCategory::SnapshotError>(status));
+
+ _checkRetryForTransaction(status);
+
+ auto opCtx = _parc->_rec->getOpCtx();
+ if (auto txnRouter = TransactionRouter::get(opCtx);
+ !txnRouter && !ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) {
+ // Non-transaction snapshot read. The client sent readConcern: {level: "snapshot",
+ // atClusterTime: T}, where T is older than minSnapshotHistoryWindowInSeconds, retrying
+ // won't succeed.
+ iassert(status);
}
+
+ if (!_canRetry())
+ iassert(status);
+}
+
+void ParseAndRunCommand::RunInvocation::_tapOnError(const Status& status) {
+ auto opCtx = _parc->_rec->getOpCtx();
+ const auto command = _parc->_rec->getCommand();
+
+ command->incrementCommandsFailed();
+ LastError::get(opCtx->getClient()).setLastError(status.code(), status.reason());
+ // WriteConcern error (wcCode) is set to boost::none because:
+ // 1. TransientTransaction error label handling for commitTransaction command in mongos is
+ // delegated to the shards. Mongos simply propagates the shard's response up to the client.
+ // 2. For other commands in a transaction, they shouldn't get a writeConcern error so this
+ // setting doesn't apply.
+ //
+ // isInternalClient is set to true to suppress mongos from returning the RetryableWriteError
+ // label.
+ auto errorLabels = getErrorLabels(opCtx,
+ *_parc->_osi,
+ command->getName(),
+ status.code(),
+ boost::none,
+ true /* isInternalClient */);
+ _parc->_errorBuilder->appendElements(errorLabels);
+}
+
+Future<void> ParseAndRunCommand::RunInvocation::_runAndRetry() {
+ auto instance = std::make_shared<RunAndRetry>(_parc);
+ return instance->run();
+}
+
+Future<void> ParseAndRunCommand::_runInvocation() {
+ auto ri = std::make_shared<RunInvocation>(shared_from_this());
+ return ri->run();
+}
+
+Future<void> ParseAndRunCommand::RunAndRetry::run() {
+ // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown.
+ _tries++;
+
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { _setup(); })
+ .then([this, anchor = shared_from_this()] {
+ return _run()
+ .onError<ErrorCodes::ShardInvalidatedForTargeting>(
+ [this, anchor = shared_from_this()](Status status) {
+ _onShardInvalidatedForTargeting(status);
+ return run(); // Retry
+ })
+ .onErrorCategory<ErrorCategory::NeedRetargettingError>(
+ [this, anchor = shared_from_this()](Status status) {
+ _onNeedRetargetting(status);
+ return run(); // Retry
+ })
+ .onError<ErrorCodes::StaleDbVersion>(
+ [this, anchor = shared_from_this()](Status status) {
+ _onStaleDbVersion(status);
+ return run(); // Retry
+ })
+ .onErrorCategory<ErrorCategory::SnapshotError>(
+ [this, anchor = shared_from_this()](Status status) {
+ _onSnapshotError(status);
+ return run(); // Retry
+ });
+ });
+ pf.promise.emplaceValue();
+ return future;
+}
+
+Future<void> ParseAndRunCommand::RunInvocation::run() {
+ auto pf = makePromiseFuture<void>();
+ auto future =
+ std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { return _setup(); })
+ .then([this, anchor = shared_from_this()] { return _runAndRetry(); })
+ .tapError([this, anchor = shared_from_this()](Status status) { _tapOnError(status); });
+ pf.promise.emplaceValue();
+ return future;
+}
+
+Future<void> ParseAndRunCommand::run() {
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { return _prologue(); })
+ .then([this, anchor = shared_from_this()] { return _runInvocation(); })
+ .onError([this, anchor = shared_from_this()](Status status) {
+ if (status.code() == ErrorCodes::SkipCommandExecution)
+ // We've already skipped execution, so no other action is required.
+ return Status::OK();
+ return status;
+ });
+ pf.promise.emplaceValue();
+ return future;
}
/**
- * Attaches the topology version to the response.
+ * Executes the command for the given request, and appends the result to replyBuilder
+ * and error labels, if any, to errorBuilder.
*/
-void attachTopologyVersionDuringShutdown(OperationContext* opCtx,
- const DBException& ex,
- BSONObjBuilder* errorBuilder) {
- // Only attach the topology version if the mongos is in quiesce mode. If the mongos is in
- // quiesce mode, this shutdown error is due to mongos rather than a shard.
- auto code = ex.code();
- if (code && ErrorCodes::isA<ErrorCategory::ShutdownError>(code)) {
- if (auto mongosTopCoord = MongosTopologyCoordinator::get(opCtx);
- mongosTopCoord && mongosTopCoord->inQuiesceMode()) {
- // Append the topology version to the response.
- const auto topologyVersion = mongosTopCoord->getTopologyVersion();
- BSONObjBuilder topologyVersionBuilder(errorBuilder->subobjStart("topologyVersion"));
- topologyVersion.serialize(&topologyVersionBuilder);
- }
- }
+Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<BSONObjBuilder> errorBuilder) {
+ auto instance = std::make_shared<ParseAndRunCommand>(std::move(rec), std::move(errorBuilder));
+ return instance->run();
}
} // namespace
@@ -1024,75 +1190,118 @@ DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss
cursorId)};
}
-DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
- auto reply = rpc::makeReplyBuilder(rpc::protocolForMessage(m));
- BSONObjBuilder errorBuilder;
+// Maintains the state required to execute client commands, and provides the interface to construct
+// a future-chain that runs the command against the database.
+class ClientCommand final : public std::enable_shared_from_this<ClientCommand> {
+public:
+ ClientCommand(ClientCommand&&) = delete;
+ ClientCommand(const ClientCommand&) = delete;
- bool propagateException = false;
+ explicit ClientCommand(std::shared_ptr<RequestExecutionContext> rec)
+ : _rec(std::move(rec)), _errorBuilder(std::make_shared<BSONObjBuilder>()) {}
- try {
- // Parse.
- OpMsgRequest request = [&] {
- try {
- return rpc::opMsgRequestFromAnyProtocol(m);
- } catch (const DBException& ex) {
- // If this error needs to fail the connection, propagate it out.
- if (ErrorCodes::isConnectionFatalMessageParseError(ex.code()))
- propagateException = true;
-
- LOGV2_DEBUG(22769,
- 1,
- "Exception thrown while parsing command {error}",
- "Exception thrown while parsing command",
- "error"_attr = redact(ex));
- throw;
- }
- }();
+ // Returns the future-chain that produces the response by parsing and executing the command.
+ Future<DbResponse> run();
- // Execute.
- std::string db = request.getDatabase().toString();
- try {
- LOGV2_DEBUG(22770,
- 3,
- "Command begin db: {db} msg id: {headerId}",
- "Command begin",
- "db"_attr = db,
- "headerId"_attr = m.header().getId());
- runCommand(opCtx, request, m, reply.get(), &errorBuilder);
+private:
+ void _parse();
+
+ Future<void> _execute();
+
+ // Handler for exceptions thrown during parsing and executing the command.
+ Future<void> _handleException(Status);
+
+ // Extracts the command response from the replyBuilder.
+ DbResponse _produceResponse();
+
+ const std::shared_ptr<RequestExecutionContext> _rec;
+ const std::shared_ptr<BSONObjBuilder> _errorBuilder;
+
+ bool _propagateException = false;
+};
+
+void ClientCommand::_parse() try {
+ const auto& msg = _rec->getMessage();
+ _rec->setReplyBuilder(rpc::makeReplyBuilder(rpc::protocolForMessage(msg)));
+ _rec->setRequest(rpc::opMsgRequestFromAnyProtocol(msg));
+} catch (const DBException& ex) {
+ // If this error needs to fail the connection, propagate it out.
+ if (ErrorCodes::isConnectionFatalMessageParseError(ex.code()))
+ _propagateException = true;
+
+ LOGV2_DEBUG(22769,
+ 1,
+ "Exception thrown while parsing command {error}",
+ "Exception thrown while parsing command",
+ "error"_attr = redact(ex));
+ throw;
+}
+
+Future<void> ClientCommand::_execute() {
+ LOGV2_DEBUG(22770,
+ 3,
+ "Command begin db: {db} msg id: {headerId}",
+ "Command begin",
+ "db"_attr = _rec->getRequest().getDatabase().toString(),
+ "headerId"_attr = _rec->getMessage().header().getId());
+
+ return runCommand(_rec, _errorBuilder)
+ .then([this, anchor = shared_from_this()] {
LOGV2_DEBUG(22771,
3,
"Command end db: {db} msg id: {headerId}",
"Command end",
- "db"_attr = db,
- "headerId"_attr = m.header().getId());
- } catch (const DBException& ex) {
+ "db"_attr = _rec->getRequest().getDatabase().toString(),
+ "headerId"_attr = _rec->getMessage().header().getId());
+ })
+ .tapError([this, anchor = shared_from_this()](Status status) {
LOGV2_DEBUG(
22772,
1,
"Exception thrown while processing command on {db} msg id: {headerId} {error}",
"Exception thrown while processing command",
- "db"_attr = db,
- "headerId"_attr = m.header().getId(),
- "error"_attr = redact(ex));
+ "db"_attr = _rec->getRequest().getDatabase().toString(),
+ "headerId"_attr = _rec->getMessage().header().getId(),
+ "error"_attr = redact(status));
// Record the exception in CurOp.
- CurOp::get(opCtx)->debug().errInfo = ex.toStatus();
- throw;
- }
- } catch (const DBException& ex) {
- if (propagateException) {
- throw;
- }
+ CurOp::get(_rec->getOpCtx())->debug().errInfo = std::move(status);
+ });
+}
+
+Future<void> ClientCommand::_handleException(Status status) {
+ if (_propagateException) {
+ return status;
+ }
+
+ auto opCtx = _rec->getOpCtx();
+ auto reply = _rec->getReplyBuilder();
- reply->reset();
- auto bob = reply->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(bob, ex.toStatus());
- appendRequiredFieldsToResponse(opCtx, &bob);
+ reply->reset();
+ auto bob = reply->getBodyBuilder();
+ CommandHelpers::appendCommandStatusNoThrow(bob, status);
+ appendRequiredFieldsToResponse(opCtx, &bob);
- attachTopologyVersionDuringShutdown(opCtx, ex, &errorBuilder);
- bob.appendElements(errorBuilder.obj());
+ // Only attach the topology version to the response if mongos is in quiesce mode. If mongos is
+ // in quiesce mode, this shutdown error is due to mongos rather than a shard.
+ if (ErrorCodes::isA<ErrorCategory::ShutdownError>(status)) {
+ if (auto mongosTopCoord = MongosTopologyCoordinator::get(opCtx);
+ mongosTopCoord && mongosTopCoord->inQuiesceMode()) {
+ // Append the topology version to the response.
+ const auto topologyVersion = mongosTopCoord->getTopologyVersion();
+ BSONObjBuilder topologyVersionBuilder(_errorBuilder->subobjStart("topologyVersion"));
+ topologyVersion.serialize(&topologyVersionBuilder);
+ }
}
+ bob.appendElements(_errorBuilder->obj());
+ return Status::OK();
+}
+
+DbResponse ClientCommand::_produceResponse() {
+ const auto& m = _rec->getMessage();
+ auto reply = _rec->getReplyBuilder();
+
if (OpMsg::isFlagSet(m, OpMsg::kMoreToCome)) {
return {}; // Don't reply.
}
@@ -1110,6 +1319,24 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
return dbResponse;
}
+Future<DbResponse> ClientCommand::run() {
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { _parse(); })
+ .then([this, anchor = shared_from_this()] { return _execute(); })
+ .onError([this, anchor = shared_from_this()](Status status) {
+ return _handleException(std::move(status));
+ })
+ .then([this, anchor = shared_from_this()] { return _produceResponse(); });
+ pf.promise.emplaceValue();
+ return future;
+}
+
+Future<DbResponse> Strategy::clientCommand(std::shared_ptr<RequestExecutionContext> rec) {
+ auto instance = std::make_shared<ClientCommand>(std::move(rec));
+ return instance->run();
+}
+
DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) {
const int ntoreturn = dbm->pullInt();
uassert(
@@ -1230,29 +1457,27 @@ void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) {
}
}
-void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) {
- const auto& msg = dbm->msg();
- rpc::OpMsgReplyBuilder reply;
- BSONObjBuilder errorBuilder;
- runCommand(opCtx,
- [&]() {
- switch (msg.operation()) {
- case dbInsert: {
- return InsertOp::parseLegacy(msg).serialize({});
- }
- case dbUpdate: {
- return UpdateOp::parseLegacy(msg).serialize({});
- }
- case dbDelete: {
- return DeleteOp::parseLegacy(msg).serialize({});
- }
- default:
- MONGO_UNREACHABLE;
- }
- }(),
- msg,
- &reply,
- &errorBuilder); // built objects are ignored
+void Strategy::writeOp(std::shared_ptr<RequestExecutionContext> rec) {
+ rec->setRequest([msg = rec->getMessage()]() {
+ switch (msg.operation()) {
+ case dbInsert: {
+ return InsertOp::parseLegacy(msg).serialize({});
+ }
+ case dbUpdate: {
+ return UpdateOp::parseLegacy(msg).serialize({});
+ }
+ case dbDelete: {
+ return DeleteOp::parseLegacy(msg).serialize({});
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }());
+
+ rec->setReplyBuilder(std::make_unique<rpc::OpMsgReplyBuilder>());
+ runCommand(std::move(rec),
+ std::make_shared<BSONObjBuilder>()) // built objects are ignored
+ .get();
}
void Strategy::explainFind(OperationContext* opCtx,
diff --git a/src/mongo/s/commands/strategy.h b/src/mongo/s/commands/strategy.h
index 3807a63a7f6..95f3f8c9449 100644
--- a/src/mongo/s/commands/strategy.h
+++ b/src/mongo/s/commands/strategy.h
@@ -33,6 +33,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/db/query/explain_options.h"
+#include "mongo/db/request_execution_context.h"
#include "mongo/s/client/shard.h"
namespace mongo {
@@ -74,7 +75,7 @@ public:
* with the result from the operation. Doesn't send any response back and does not throw on
* errors.
*/
- static void writeOp(OperationContext* opCtx, DbMessage* dbm);
+ static void writeOp(std::shared_ptr<RequestExecutionContext> rec);
/**
* Executes a command from either OP_QUERY or OP_MSG wire protocols.
@@ -82,7 +83,7 @@ public:
* Catches StaleConfigException errors and retries the command automatically after refreshing
* the metadata for the failing namespace.
*/
- static DbResponse clientCommand(OperationContext* opCtx, const Message& message);
+ static Future<DbResponse> clientCommand(std::shared_ptr<RequestExecutionContext> rec);
/**
* Helper to run an explain of a find operation on the shards. Fills 'out' with the result of
diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp
index 2469445f418..d93bc314e2b 100644
--- a/src/mongo/s/service_entry_point_mongos.cpp
+++ b/src/mongo/s/service_entry_point_mongos.cpp
@@ -29,6 +29,8 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
+#include <memory>
+
#include "mongo/platform/basic.h"
#include "mongo/s/service_entry_point_mongos.h"
@@ -40,12 +42,12 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/request_execution_context.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/message.h"
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/commands/strategy.h"
-#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -60,16 +62,50 @@ BSONObj buildErrReply(const DBException& ex) {
} // namespace
+// Allows for decomposing `handleRequest` into parts and simplifies composing the future-chain.
+struct HandleRequest : public std::enable_shared_from_this<HandleRequest> {
+ struct OpRunnerBase;
-Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCtx,
- const Message& message) noexcept try {
- const int32_t msgId = message.header().getId();
- const NetworkOp op = message.operation();
+ HandleRequest(OperationContext* opCtx, const Message& message)
+ : rec(std::make_shared<RequestExecutionContext>(opCtx, message)),
+ op(message.operation()),
+ msgId(message.header().getId()),
+ nsString(getNamespaceString(rec->getDbMessage())) {}
+
+ // Prepares the environment for handling the request (e.g., setting up `ClusterLastErrorInfo`).
+ void setupEnvironment();
+
+ // Returns a future that does the heavy lifting of running client commands.
+ Future<DbResponse> handleRequest();
+
+ // Runs on successful execution of the future returned by `handleRequest`.
+ void onSuccess(const DbResponse&);
+
+ // Returns a future-chain to handle the request and prepare the response.
+ Future<DbResponse> run();
+
+ static NamespaceString getNamespaceString(const DbMessage& dbmsg) {
+ if (!dbmsg.messageShouldHaveNs())
+ return {};
+ return NamespaceString(dbmsg.getns());
+ }
+
+ const std::shared_ptr<RequestExecutionContext> rec;
+ const NetworkOp op;
+ const int32_t msgId;
+ const NamespaceString nsString;
+
+ boost::optional<long long> slowMsOverride;
+};
+
+void HandleRequest::setupEnvironment() {
+ using namespace fmt::literals;
+ auto opCtx = rec->getOpCtx();
// This exception will not be returned to the caller, but will be logged and will close the
// connection
uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Message type " << op << " is not supported.",
+ "Message type {} is not supported."_format(op),
isSupportedRequestNetworkOp(op) &&
op != dbCompressed); // Decompression should be handled above us.
@@ -84,116 +120,175 @@ Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCt
AuthorizationSession::get(opCtx->getClient())->startRequest(opCtx);
CurOp::get(opCtx)->ensureStarted();
+}
- DbMessage dbm(message);
+// The base for various operation runners that handle the request, and often generate a DbResponse.
+struct HandleRequest::OpRunnerBase {
+ explicit OpRunnerBase(std::shared_ptr<HandleRequest> hr) : hr(std::move(hr)) {}
+ virtual ~OpRunnerBase() = default;
+ virtual Future<DbResponse> run() = 0;
+ const std::shared_ptr<HandleRequest> hr;
+};
+
+struct CommandOpRunner final : public HandleRequest::OpRunnerBase {
+ using HandleRequest::OpRunnerBase::OpRunnerBase;
+ Future<DbResponse> run() override {
+ return Strategy::clientCommand(hr->rec).tap([hr = hr](const DbResponse&) {
+ // The hello/isMaster commands should take kMaxAwaitTimeMs at most, log if it takes
+ // twice that.
+ if (auto command = CurOp::get(hr->rec->getOpCtx())->getCommand();
+ command && (command->getName() == "hello" || command->getName() == "isMaster")) {
+ hr->slowMsOverride =
+ 2 * durationCount<Milliseconds>(SingleServerDiscoveryMonitor::kMaxAwaitTime);
+ }
+ });
+ }
+};
+
+// The base for operations that may throw exceptions, but should not cause the connection to close.
+struct OpRunner : public HandleRequest::OpRunnerBase {
+ using HandleRequest::OpRunnerBase::OpRunnerBase;
+ virtual DbResponse runOperation() = 0;
+ Future<DbResponse> run() override;
+};
+
+Future<DbResponse> OpRunner::run() try {
+ using namespace fmt::literals;
+ const NamespaceString& nss = hr->nsString;
+ const DbMessage& dbm = hr->rec->getDbMessage();
+
+ if (dbm.messageShouldHaveNs()) {
+ uassert(ErrorCodes::InvalidNamespace, "Invalid ns [{}]"_format(nss.ns()), nss.isValid());
+
+ uassert(ErrorCodes::IllegalOperation,
+ "Can't use 'local' database through mongos",
+ nss.db() != NamespaceString::kLocalDb);
+ }
- // This is before the try block since it handles all exceptions that should not cause the
- // connection to close.
- if (op == dbMsg || (op == dbQuery && NamespaceString(dbm.getns()).isCommand())) {
- auto dbResponse = Strategy::clientCommand(opCtx, message);
+ LOGV2_DEBUG(22867,
+ 3,
+ "Request::process begin ns: {namespace} msg id: {msgId} op: {operation}",
+ "Starting operation",
+ "namespace"_attr = nss,
+ "msgId"_attr = hr->msgId,
+ "operation"_attr = networkOpToString(hr->op));
- // The hello/isMaster commands should take kMaxAwaitTimeMs at most, log if it takes twice
- // that.
- boost::optional<long long> slowMsOverride;
- if (auto command = CurOp::get(opCtx)->getCommand();
- command && (command->getName() == "hello" || command->getName() == "isMaster")) {
- slowMsOverride =
- 2 * durationCount<Milliseconds>(SingleServerDiscoveryMonitor::kMaxAwaitTime);
- }
+ auto dbResponse = runOperation();
- // Mark the op as complete, populate the response length, and log it if appropriate.
- CurOp::get(opCtx)->completeAndLogOperation(
- opCtx, logv2::LogComponent::kCommand, dbResponse.response.size(), slowMsOverride);
+ LOGV2_DEBUG(22868,
+ 3,
+ "Request::process end ns: {namespace} msg id: {msgId} op: {operation}",
+ "Done processing operation",
+ "namespace"_attr = nss,
+ "msgId"_attr = hr->msgId,
+ "operation"_attr = networkOpToString(hr->op));
- return Future<DbResponse>::makeReady(std::move(dbResponse));
- }
+ return Future<DbResponse>::makeReady(std::move(dbResponse));
+} catch (const DBException& ex) {
+ LOGV2_DEBUG(22869,
+ 1,
+ "Exception thrown while processing {operation} op for {namespace}: {error}",
+ "Got an error while processing operation",
+ "operation"_attr = networkOpToString(hr->op),
+ "namespace"_attr = hr->nsString.ns(),
+ "error"_attr = ex);
- NamespaceString nss;
DbResponse dbResponse;
- try {
- if (dbm.messageShouldHaveNs()) {
- nss = NamespaceString(StringData(dbm.getns()));
-
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid ns [" << nss.ns() << "]",
- nss.isValid());
-
- uassert(ErrorCodes::IllegalOperation,
- "Can't use 'local' database through mongos",
- nss.db() != NamespaceString::kLocalDb);
- }
-
-
- LOGV2_DEBUG(22867,
- 3,
- "Request::process begin ns: {namespace} msg id: {msgId} op: {operation}",
- "Starting operation",
- "namespace"_attr = nss,
- "msgId"_attr = msgId,
- "operation"_attr = networkOpToString(op));
-
- switch (op) {
- case dbQuery:
- // Commands are handled above through Strategy::clientCommand().
- invariant(!nss.isCommand());
- opCtx->markKillOnClientDisconnect();
- dbResponse = Strategy::queryOp(opCtx, nss, &dbm);
- break;
-
- case dbGetMore:
- dbResponse = Strategy::getMore(opCtx, nss, &dbm);
- break;
-
- case dbKillCursors:
- Strategy::killCursors(opCtx, &dbm); // No Response.
- break;
-
- case dbInsert:
- case dbUpdate:
- case dbDelete:
- Strategy::writeOp(opCtx, &dbm); // No Response.
- break;
-
- default:
- MONGO_UNREACHABLE;
- }
-
- LOGV2_DEBUG(22868,
- 3,
- "Request::process end ns: {namespace} msg id: {msgId} op: {operation}",
- "Done processing operation",
- "namespace"_attr = nss,
- "msgId"_attr = msgId,
- "operation"_attr = networkOpToString(op));
-
- } catch (const DBException& ex) {
- LOGV2_DEBUG(22869,
- 1,
- "Exception thrown while processing {operation} op for {namespace}: {error}",
- "Got an error while processing operation",
- "operation"_attr = networkOpToString(op),
- "namespace"_attr = nss.ns(),
- "error"_attr = ex);
-
- if (op == dbQuery || op == dbGetMore) {
- dbResponse = replyToQuery(buildErrReply(ex), ResultFlag_ErrSet);
- } else {
- // No Response.
- }
-
- // We *always* populate the last error for now
- LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.what());
- CurOp::get(opCtx)->debug().errInfo = ex.toStatus();
+ if (hr->op == dbQuery || hr->op == dbGetMore) {
+ dbResponse = replyToQuery(buildErrReply(ex), ResultFlag_ErrSet);
+ } else {
+ // No Response.
+ }
+
+ // We *always* populate the last error for now
+ auto opCtx = hr->rec->getOpCtx();
+ LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.what());
+
+ CurOp::get(opCtx)->debug().errInfo = ex.toStatus();
+
+ return Future<DbResponse>::makeReady(std::move(dbResponse));
+}
+
+struct QueryOpRunner final : public OpRunner {
+ using OpRunner::OpRunner;
+ DbResponse runOperation() override {
+ // Commands are handled through CommandOpRunner and Strategy::clientCommand().
+ invariant(!hr->nsString.isCommand());
+ hr->rec->getOpCtx()->markKillOnClientDisconnect();
+ return Strategy::queryOp(hr->rec->getOpCtx(), hr->nsString, &hr->rec->getDbMessage());
+ }
+};
+
+struct GetMoreOpRunner final : public OpRunner {
+ using OpRunner::OpRunner;
+ DbResponse runOperation() override {
+ return Strategy::getMore(hr->rec->getOpCtx(), hr->nsString, &hr->rec->getDbMessage());
+ }
+};
+
+struct KillCursorsOpRunner final : public OpRunner {
+ using OpRunner::OpRunner;
+ DbResponse runOperation() override {
+ Strategy::killCursors(hr->rec->getOpCtx(), &hr->rec->getDbMessage()); // No Response.
+ return {};
}
+};
+struct WriteOpRunner final : public OpRunner {
+ using OpRunner::OpRunner;
+ DbResponse runOperation() override {
+ Strategy::writeOp(hr->rec); // No Response.
+ return {};
+ }
+};
+
+Future<DbResponse> HandleRequest::handleRequest() {
+ switch (op) {
+ case dbQuery:
+ if (!nsString.isCommand())
+ return std::make_unique<QueryOpRunner>(shared_from_this())->run();
+ // FALLTHROUGH: it's a query containing a command
+ case dbMsg:
+ return std::make_unique<CommandOpRunner>(shared_from_this())->run();
+ case dbGetMore:
+ return std::make_unique<GetMoreOpRunner>(shared_from_this())->run();
+ case dbKillCursors:
+ return std::make_unique<KillCursorsOpRunner>(shared_from_this())->run();
+ case dbInsert:
+ case dbUpdate:
+ case dbDelete:
+ return std::make_unique<WriteOpRunner>(shared_from_this())->run();
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+void HandleRequest::onSuccess(const DbResponse& dbResponse) {
+ auto opCtx = rec->getOpCtx();
// Mark the op as complete, populate the response length, and log it if appropriate.
CurOp::get(opCtx)->completeAndLogOperation(
- opCtx, logv2::LogComponent::kCommand, dbResponse.response.size());
+ opCtx, logv2::LogComponent::kCommand, dbResponse.response.size(), slowMsOverride);
+}
- return Future<DbResponse>::makeReady(std::move(dbResponse));
-} catch (const DBException& e) {
- LOGV2(4879803, "Failed to handle request", "error"_attr = redact(e));
- return e.toStatus();
+Future<DbResponse> HandleRequest::run() {
+ auto fp = makePromiseFuture<void>();
+ auto future = std::move(fp.future)
+ .then([this, anchor = shared_from_this()] { setupEnvironment(); })
+ .then([this, anchor = shared_from_this()] { return handleRequest(); })
+ .tap([this, anchor = shared_from_this()](const DbResponse& dbResponse) {
+ onSuccess(dbResponse);
+ })
+ .tapError([](Status status) {
+ LOGV2(4879803, "Failed to handle request", "error"_attr = redact(status));
+ });
+ fp.promise.emplaceValue();
+ return future;
+}
+
+Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCtx,
+ const Message& message) noexcept {
+ auto hr = std::make_shared<HandleRequest>(opCtx, message);
+ return hr->run();
}
} // namespace mongo