summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-11-17 17:07:44 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-11-17 17:07:44 +0000
commit13bb35d34dc20273d342e52360b89e6f510d1747 (patch)
treeb592cd7cc8c1edae83fe43a709945187041878c2
parentf4373b85f0f394c485cbff447312c567924ba5e3 (diff)
downloadmongo-13bb35d34dc20273d342e52360b89e6f510d1747.tar.gz
SERVER-51690 Futurize Mongos runCommand for async command execution
-rw-r--r--src/mongo/s/commands/strategy.cpp999
1 files changed, 561 insertions, 438 deletions
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index d294ee93cf7..fdca7bf7ea6 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -188,10 +188,10 @@ void invokeInTransactionRouter(OperationContext* opCtx,
*/
void addContextForTransactionAbortingError(StringData txnIdAsString,
StmtId latestStmtId,
- DBException& ex,
+ Status& status,
StringData reason) {
- ex.addContext(str::stream() << "Transaction " << txnIdAsString << " was aborted on statement "
- << latestStmtId << " due to: " << reason);
+ status.addContext("Transaction {} was aborted on statement {} due to: {}"_format(
+ txnIdAsString, latestStmtId, reason));
}
void execCommandClient(OperationContext* opCtx,
@@ -305,28 +305,135 @@ void execCommandClient(OperationContext* opCtx,
MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError);
/**
- * Executes the command for the given request, and appends the result to replyBuilder
- * and error labels, if any, to errorBuilder.
+ * Produces a future-chain that parses the command, runs the parsed command, and captures the result
+ * in replyBuilder.
*/
-Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec,
- std::shared_ptr<BSONObjBuilder> errorBuilder) try {
- auto opCtx = rec->getOpCtx();
- const auto& request = rec->getRequest();
- const auto& m = rec->getMessage();
- auto replyBuilder = rec->getReplyBuilder();
- auto const opType = m.operation();
- auto const commandName = request.getCommandName();
- auto const command = CommandHelpers::findCommand(commandName);
+class ParseAndRunCommand final : public std::enable_shared_from_this<ParseAndRunCommand> {
+public:
+ ParseAndRunCommand(const ParseAndRunCommand&) = delete;
+ ParseAndRunCommand(ParseAndRunCommand&&) = delete;
+
+ ParseAndRunCommand(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<BSONObjBuilder> errorBuilder)
+ : _rec(std::move(rec)),
+ _errorBuilder(std::move(errorBuilder)),
+ _opType(_rec->getMessage().operation()),
+ _commandName(_rec->getRequest().getCommandName()) {}
+
+ Future<void> run();
+
+private:
+ class RunInvocation;
+ class RunAndRetry;
+
+ // Prepares the environment for running the command (e.g., parsing the command to produce the
+ // invocation and extracting read/write concerns).
+ Status _prologue();
+
+ // Returns a future-chain that runs the parse invocation.
+ Future<void> _runInvocation();
+
+ const std::shared_ptr<RequestExecutionContext> _rec;
+ const std::shared_ptr<BSONObjBuilder> _errorBuilder;
+ const NetworkOp _opType;
+ const StringData _commandName;
+
+ std::shared_ptr<CommandInvocation> _invocation;
+ boost::optional<std::string> _ns;
+ boost::optional<OperationSessionInfoFromClient> _osi;
+ boost::optional<WriteConcernOptions> _wc;
+};
+
+/*
+ * Produces a future-chain to run the invocation and capture the result in replyBuilder.
+ */
+class ParseAndRunCommand::RunInvocation final
+ : public std::enable_shared_from_this<ParseAndRunCommand::RunInvocation> {
+public:
+ RunInvocation(RunInvocation&&) = delete;
+ RunInvocation(const RunInvocation&) = delete;
+
+ explicit RunInvocation(std::shared_ptr<ParseAndRunCommand> parc) : _parc(std::move(parc)) {}
+
+ ~RunInvocation() {
+ if (!_shouldAffectCommandCounter)
+ return;
+ auto opCtx = _parc->_rec->getOpCtx();
+ Grid::get(opCtx)->catalogCache()->checkAndRecordOperationBlockedByRefresh(
+ opCtx, mongo::LogicalOp::opCommand);
+ }
+
+ Future<void> run();
+
+private:
+ Status _setup();
+
+ // Returns a future-chain that runs the invocation and retries if necessary.
+ Future<void> _runAndRetry();
+
+ // Logs and updates statistics if an error occurs during `_setup()` or `_runAndRetry()`.
+ void _tapOnError(const Status& status);
+
+ const std::shared_ptr<ParseAndRunCommand> _parc;
+
+ boost::optional<RouterOperationContextSession> _routerSession;
+ bool _shouldAffectCommandCounter = false;
+};
+
+/*
+ * Produces a future-chain that runs the invocation and retries if necessary.
+ */
+class ParseAndRunCommand::RunAndRetry final
+ : public std::enable_shared_from_this<ParseAndRunCommand::RunAndRetry> {
+public:
+ RunAndRetry(RunAndRetry&&) = delete;
+ RunAndRetry(const RunAndRetry&) = delete;
+
+ explicit RunAndRetry(std::shared_ptr<ParseAndRunCommand> parc) : _parc(std::move(parc)) {}
+
+ Future<void> run();
+
+private:
+ bool _canRetry() const {
+ return _tries < kMaxNumStaleVersionRetries;
+ }
+
+ // Sets up the environment for running the invocation, and clears the state from the last try.
+ void _setup();
+
+ Future<void> _run();
+
+ // Exception handler for error codes that may trigger a retry. All methods will throw `status`
+ // unless an attempt to retry is possible.
+ void _checkRetryForTransaction(Status& status);
+ void _onShardInvalidatedForTargeting(Status& status);
+ void _onNeedRetargetting(Status& status);
+ void _onStaleDbVersion(Status& status);
+ void _onSnapshotError(Status& status);
+
+ const std::shared_ptr<ParseAndRunCommand> _parc;
+
+ int _tries = 0;
+};
+
+Status ParseAndRunCommand::_prologue() {
+ auto opCtx = _rec->getOpCtx();
+ const auto& m = _rec->getMessage();
+ const auto& request = _rec->getRequest();
+ auto replyBuilder = _rec->getReplyBuilder();
+
+ auto const command = CommandHelpers::findCommand(_commandName);
if (!command) {
+ const std::string errorMsg = "no such cmd: {}"_format(_commandName);
auto builder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- builder,
- {ErrorCodes::CommandNotFound, str::stream() << "no such cmd: " << commandName});
+ CommandHelpers::appendCommandStatusNoThrow(builder,
+ {ErrorCodes::CommandNotFound, errorMsg});
globalCommandRegistry()->incrementUnknownCommands();
appendRequiredFieldsToResponse(opCtx, &builder);
- return Status::OK();
+ return {ErrorCodes::SkipCommandExecution, errorMsg};
}
+ _rec->setCommand(command);
opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported));
const auto session = opCtx->getClient()->session();
if (session) {
@@ -360,30 +467,31 @@ Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec,
opCtx->setComment(commentField.wrap());
}
- std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request);
- CommandInvocation::set(opCtx, invocation);
+ _invocation = command->parse(opCtx, request);
+ CommandInvocation::set(opCtx, _invocation);
// Set the logical optype, command object and namespace as soon as we identify the command. If
// the command does not define a fully-qualified namespace, set CurOp to the generic command
// namespace db.$cmd.
- std::string ns = invocation->ns().toString();
- auto nss = (request.getDatabase() == ns ? NamespaceString(ns, "$cmd") : NamespaceString(ns));
+ _ns.emplace(_invocation->ns().toString());
+ auto nss =
+ (request.getDatabase() == *_ns ? NamespaceString(*_ns, "$cmd") : NamespaceString(*_ns));
// Fill out all currentOp details.
- CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, opType);
+ CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, _opType);
- auto osi = initializeOperationSessionInfo(opCtx,
- request.body,
- command->requiresAuth(),
- command->attachLogicalSessionsToOpCtx(),
- true);
+ _osi.emplace(initializeOperationSessionInfo(opCtx,
+ request.body,
+ command->requiresAuth(),
+ command->attachLogicalSessionsToOpCtx(),
+ true));
// TODO SERVER-28756: Change allowTransactionsOnConfigDatabase to true once we fix the bug
// where the mongos custom write path incorrectly drops the client's txnNumber.
auto allowTransactionsOnConfigDatabase = false;
- validateSessionOptions(osi, command->getName(), nss, allowTransactionsOnConfigDatabase);
+ validateSessionOptions(*_osi, command->getName(), nss, allowTransactionsOnConfigDatabase);
- auto wc = uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body));
+ _wc.emplace(uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body)));
Client* client = opCtx->getClient();
auto const apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command);
@@ -401,7 +509,7 @@ Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec,
if (!readConcernParseStatus.isOK()) {
auto builder = replyBuilder->getBodyBuilder();
CommandHelpers::appendCommandStatusNoThrow(builder, readConcernParseStatus);
- return Status::OK();
+ return {ErrorCodes::SkipCommandExecution, "Failed to parse read concern"};
}
auto& apiParams = APIParameters::get(opCtx);
@@ -412,484 +520,499 @@ Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec,
apiVersionMetrics.update(appName, apiParams);
}
- boost::optional<RouterOperationContextSession> routerSession;
- try {
- rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
-
- CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get());
- bool startTransaction = false;
- if (osi.getAutocommit()) {
- routerSession.emplace(opCtx);
-
- auto txnRouter = TransactionRouter::get(opCtx);
- invariant(txnRouter);
+ return Status::OK();
+}
- auto txnNumber = opCtx->getTxnNumber();
- invariant(txnNumber);
+Status ParseAndRunCommand::RunInvocation::_setup() {
+ auto invocation = _parc->_invocation;
+ auto opCtx = _parc->_rec->getOpCtx();
+ auto command = _parc->_rec->getCommand();
+ const auto& request = _parc->_rec->getRequest();
+ auto replyBuilder = _parc->_rec->getReplyBuilder();
- auto transactionAction = ([&] {
- auto startTxnSetting = osi.getStartTransaction();
- if (startTxnSetting && *startTxnSetting) {
- return TransactionRouter::TransactionActions::kStart;
- }
+ auto appendStatusToReplyAndSkipCommandExecution = [replyBuilder](Status status) -> Status {
+ auto responseBuilder = replyBuilder->getBodyBuilder();
+ CommandHelpers::appendCommandStatusNoThrow(responseBuilder, status);
+ return Status(ErrorCodes::SkipCommandExecution, status.reason());
+ };
- if (command->getName() == CommitTransaction::kCommandName) {
- return TransactionRouter::TransactionActions::kCommit;
- }
+ rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
- return TransactionRouter::TransactionActions::kContinue;
- })();
+ CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get());
+ bool startTransaction = false;
+ if (_parc->_osi->getAutocommit()) {
+ _routerSession.emplace(opCtx);
- startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart);
- txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction);
- }
+ auto txnRouter = TransactionRouter::get(opCtx);
+ invariant(txnRouter);
- bool supportsWriteConcern = invocation->supportsWriteConcern();
- if (!supportsWriteConcern &&
- request.body.hasField(WriteConcernOptions::kWriteConcernField)) {
- // This command doesn't do writes so it should not be passed a writeConcern.
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern"));
- return Status::OK();
- }
+ auto txnNumber = opCtx->getTxnNumber();
+ invariant(txnNumber);
- bool clientSuppliedWriteConcern = !wc.usedDefault;
- bool customDefaultWriteConcernWasApplied = false;
-
- if (supportsWriteConcern && !clientSuppliedWriteConcern &&
- (!TransactionRouter::get(opCtx) || isTransactionCommand(commandName))) {
- // This command supports WC, but wasn't given one - so apply the default, if there is
- // one.
- if (const auto wcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext())
- .getDefaultWriteConcern(opCtx)) {
- wc = *wcDefault;
- customDefaultWriteConcernWasApplied = true;
- LOGV2_DEBUG(22766,
- 2,
- "Applying default writeConcern on {command} of {writeConcern}",
- "Applying default writeConcern on command",
- "command"_attr = request.getCommandName(),
- "writeConcern"_attr = *wcDefault);
+ auto transactionAction = ([&] {
+ auto startTxnSetting = _parc->_osi->getStartTransaction();
+ if (startTxnSetting && *startTxnSetting) {
+ return TransactionRouter::TransactionActions::kStart;
}
- }
-
- if (TransactionRouter::get(opCtx)) {
- validateWriteConcernForTransaction(wc, commandName);
- }
- if (supportsWriteConcern) {
- auto& provenance = wc.getProvenance();
-
- // ClientSupplied is the only provenance that clients are allowed to pass to mongos.
- if (provenance.hasSource() && !provenance.isClientSupplied()) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- Status{ErrorCodes::InvalidOptions,
- "writeConcern provenance must be unset or \"{}\""_format(
- ReadWriteConcernProvenance::kClientSupplied)});
- return Status::OK();
+ if (command->getName() == CommitTransaction::kCommandName) {
+ return TransactionRouter::TransactionActions::kCommit;
}
- // If the client didn't provide a provenance, then an appropriate value needs to be
- // determined.
- if (!provenance.hasSource()) {
- if (clientSuppliedWriteConcern) {
- provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied);
- } else if (customDefaultWriteConcernWasApplied) {
- provenance.setSource(ReadWriteConcernProvenance::Source::customDefault);
- } else {
- provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault);
- }
- }
+ return TransactionRouter::TransactionActions::kContinue;
+ })();
- // Ensure that the WC being set on the opCtx has provenance.
- invariant(wc.getProvenance().hasSource(),
- str::stream()
- << "unexpected unset provenance on writeConcern: " << wc.toBSON());
+ startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction);
+ }
- opCtx->setWriteConcern(wc);
- }
+ bool supportsWriteConcern = invocation->supportsWriteConcern();
+ if (!supportsWriteConcern && request.body.hasField(WriteConcernOptions::kWriteConcernField)) {
+ // This command doesn't do writes so it should not be passed a writeConcern.
+ const auto errorMsg = "Command does not support writeConcern";
+ return appendStatusToReplyAndSkipCommandExecution({ErrorCodes::InvalidOptions, errorMsg});
+ }
- bool clientSuppliedReadConcern = readConcernArgs.isSpecified();
- bool customDefaultReadConcernWasApplied = false;
-
- auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
- if (readConcernSupport.defaultReadConcernPermit.isOK() &&
- (startTransaction || !TransactionRouter::get(opCtx))) {
- if (readConcernArgs.isEmpty()) {
- const auto rcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext())
- .getDefaultReadConcern(opCtx);
- if (rcDefault) {
- {
- // We must obtain the client lock to set ReadConcernArgs, because it's an
- // in-place reference to the object on the operation context, which may be
- // concurrently used elsewhere (eg. read by currentOp).
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- readConcernArgs = std::move(*rcDefault);
- }
- customDefaultReadConcernWasApplied = true;
- LOGV2_DEBUG(22767,
- 2,
- "Applying default readConcern on {command} of {readConcern}",
- "Applying default readConcern on command",
- "command"_attr = invocation->definition()->getName(),
- "readConcern"_attr = *rcDefault);
- // Update the readConcernSupport, since the default RC was applied.
- readConcernSupport =
- invocation->supportsReadConcern(readConcernArgs.getLevel());
- }
- }
+ bool clientSuppliedWriteConcern = !_parc->_wc->usedDefault;
+ bool customDefaultWriteConcernWasApplied = false;
+
+ if (supportsWriteConcern && !clientSuppliedWriteConcern &&
+ (!TransactionRouter::get(opCtx) || isTransactionCommand(_parc->_commandName))) {
+ // This command supports WC, but wasn't given one - so apply the default, if there is one.
+ if (const auto wcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext())
+ .getDefaultWriteConcern(opCtx)) {
+ _parc->_wc = *wcDefault;
+ customDefaultWriteConcernWasApplied = true;
+ LOGV2_DEBUG(22766,
+ 2,
+ "Applying default writeConcern on {command} of {writeConcern}",
+ "Applying default writeConcern on command",
+ "command"_attr = request.getCommandName(),
+ "writeConcern"_attr = *wcDefault);
}
+ }
- auto& provenance = readConcernArgs.getProvenance();
+ if (TransactionRouter::get(opCtx)) {
+ validateWriteConcernForTransaction(*_parc->_wc, _parc->_commandName);
+ }
+
+ if (supportsWriteConcern) {
+ auto& provenance = _parc->_wc->getProvenance();
// ClientSupplied is the only provenance that clients are allowed to pass to mongos.
if (provenance.hasSource() && !provenance.isClientSupplied()) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- Status{ErrorCodes::InvalidOptions,
- "readConcern provenance must be unset or \"{}\""_format(
- ReadWriteConcernProvenance::kClientSupplied)});
- return Status::OK();
+ const auto errorMsg = "writeConcern provenance must be unset or \"{}\""_format(
+ ReadWriteConcernProvenance::kClientSupplied);
+ return appendStatusToReplyAndSkipCommandExecution(
+ {ErrorCodes::InvalidOptions, errorMsg});
}
// If the client didn't provide a provenance, then an appropriate value needs to be
// determined.
if (!provenance.hasSource()) {
- // We must obtain the client lock to set the provenance of the opCtx's ReadConcernArgs
- // as it may be concurrently read by CurrentOp.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- if (clientSuppliedReadConcern) {
+ if (clientSuppliedWriteConcern) {
provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied);
- } else if (customDefaultReadConcernWasApplied) {
+ } else if (customDefaultWriteConcernWasApplied) {
provenance.setSource(ReadWriteConcernProvenance::Source::customDefault);
} else {
provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault);
}
}
- // Ensure that the RC on the opCtx has provenance.
- invariant(readConcernArgs.getProvenance().hasSource(),
- str::stream() << "unexpected unset provenance on readConcern: "
- << readConcernArgs.toBSONInner());
-
- // If we are starting a transaction, we only need to check whether the read concern is
- // appropriate for running a transaction. There is no need to check whether the specific
- // command supports the read concern, because all commands that are allowed to run in a
- // transaction must support all applicable read concerns.
- if (startTransaction) {
- if (!isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- {ErrorCodes::InvalidOptions,
- "The readConcern level must be either 'local' (default), 'majority' or "
- "'snapshot' in order to run in a transaction"});
- return Status::OK();
- }
- if (readConcernArgs.getArgsOpTime()) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- {ErrorCodes::InvalidOptions,
- str::stream()
- << "The readConcern cannot specify '"
- << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction"});
- return Status::OK();
- }
- }
+ // Ensure that the WC being set on the opCtx has provenance.
+ invariant(_parc->_wc->getProvenance().hasSource(),
+ "unexpected unset provenance on writeConcern: {}"_format(
+ _parc->_wc->toBSON().toString()));
+
+ opCtx->setWriteConcern(*_parc->_wc);
+ }
- // Otherwise, if there is a read concern present - either user-specified or the default -
- // then check whether the command supports it. If there is no explicit read concern level,
- // then it is implicitly "local". There is no need to check whether this is supported,
- // because all commands either support "local" or upconvert the absent readConcern to a
- // stronger level that they do support; e.g. $changeStream upconverts to RC "majority".
- //
- // Individual transaction statements are checked later on, after we've unstashed the
- // transaction resources.
- if (!TransactionRouter::get(opCtx) && readConcernArgs.hasLevel()) {
- if (!readConcernSupport.readConcernSupport.isOK()) {
- auto responseBuilder = replyBuilder->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(
- responseBuilder,
- readConcernSupport.readConcernSupport.withContext(
- str::stream() << "Command " << invocation->definition()->getName()
- << " does not support " << readConcernArgs.toString()));
- return Status::OK();
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ bool clientSuppliedReadConcern = readConcernArgs.isSpecified();
+ bool customDefaultReadConcernWasApplied = false;
+
+ auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
+ if (readConcernSupport.defaultReadConcernPermit.isOK() &&
+ (startTransaction || !TransactionRouter::get(opCtx))) {
+ if (readConcernArgs.isEmpty()) {
+ const auto rcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext())
+ .getDefaultReadConcern(opCtx);
+ if (rcDefault) {
+ {
+ // We must obtain the client lock to set ReadConcernArgs, because it's an
+ // in-place reference to the object on the operation context, which may be
+ // concurrently used elsewhere (eg. read by currentOp).
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ readConcernArgs = std::move(*rcDefault);
+ }
+ customDefaultReadConcernWasApplied = true;
+ LOGV2_DEBUG(22767,
+ 2,
+ "Applying default readConcern on {command} of {readConcern}",
+ "Applying default readConcern on command",
+ "command"_attr = invocation->definition()->getName(),
+ "readConcern"_attr = *rcDefault);
+ // Update the readConcernSupport, since the default RC was applied.
+ readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel());
}
}
+ }
- // Remember whether or not this operation is starting a transaction, in case something later
- // in the execution needs to adjust its behavior based on this.
- opCtx->setIsStartingMultiDocumentTransaction(startTransaction);
+ auto& provenance = readConcernArgs.getProvenance();
- command->incrementCommandsExecuted();
+ // ClientSupplied is the only provenance that clients are allowed to pass to mongos.
+ if (provenance.hasSource() && !provenance.isClientSupplied()) {
+ const auto errorMsg = "readConcern provenance must be unset or \"{}\""_format(
+ ReadWriteConcernProvenance::kClientSupplied);
+ return appendStatusToReplyAndSkipCommandExecution({ErrorCodes::InvalidOptions, errorMsg});
+ }
- auto shouldAffectCommandCounter = command->shouldAffectCommandCounter();
+ // If the client didn't provide a provenance, then an appropriate value needs to be determined.
+ if (!provenance.hasSource()) {
+ // We must obtain the client lock to set the provenance of the opCtx's ReadConcernArgs as it
+ // may be concurrently read by CurrentOp.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ if (clientSuppliedReadConcern) {
+ provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied);
+ } else if (customDefaultReadConcernWasApplied) {
+ provenance.setSource(ReadWriteConcernProvenance::Source::customDefault);
+ } else {
+ provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault);
+ }
+ }
- if (shouldAffectCommandCounter) {
- globalOpCounters.gotCommand();
+ // Ensure that the RC on the opCtx has provenance.
+ invariant(readConcernArgs.getProvenance().hasSource(),
+ "unexpected unset provenance on readConcern: {}"_format(
+ readConcernArgs.toBSONInner().toString()));
+
+ // If we are starting a transaction, we only need to check whether the read concern is
+ // appropriate for running a transaction. There is no need to check whether the specific command
+ // supports the read concern, because all commands that are allowed to run in a transaction must
+ // support all applicable read concerns.
+ if (startTransaction) {
+ if (!isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())) {
+ const auto errorMsg =
+ "The readConcern level must be either 'local' (default), 'majority' or "
+ "'snapshot' in order to run in a transaction";
+ return appendStatusToReplyAndSkipCommandExecution(
+ {ErrorCodes::InvalidOptions, errorMsg});
}
+ if (readConcernArgs.getArgsOpTime()) {
+ const std::string errorMsg =
+ "The readConcern cannot specify '{}' in a transaction"_format(
+ repl::ReadConcernArgs::kAfterOpTimeFieldName);
+ return appendStatusToReplyAndSkipCommandExecution(
+ {ErrorCodes::InvalidOptions, errorMsg});
+ }
+ }
- ON_BLOCK_EXIT([opCtx, shouldAffectCommandCounter] {
- if (shouldAffectCommandCounter) {
- Grid::get(opCtx)->catalogCache()->checkAndRecordOperationBlockedByRefresh(
- opCtx, mongo::LogicalOp::opCommand);
- }
- });
+ // Otherwise, if there is a read concern present - either user-specified or the default - then
+ // check whether the command supports it. If there is no explicit read concern level, then it is
+ // implicitly "local". There is no need to check whether this is supported, because all commands
+ // either support "local" or upconvert the absent readConcern to a stronger level that they do
+ // support; e.g. $changeStream upconverts to RC "majority".
+ //
+ // Individual transaction statements are checked later on, after we've unstashed the transaction
+ // resources.
+ if (!TransactionRouter::get(opCtx) && readConcernArgs.hasLevel()) {
+ if (!readConcernSupport.readConcernSupport.isOK()) {
+ const std::string errorMsg = "Command {} does not support {}"_format(
+ invocation->definition()->getName(), readConcernArgs.toString());
+ return appendStatusToReplyAndSkipCommandExecution(
+ readConcernSupport.readConcernSupport.withContext(errorMsg));
+ }
+ }
+ // Remember whether or not this operation is starting a transaction, in case something later in
+ // the execution needs to adjust its behavior based on this.
+ opCtx->setIsStartingMultiDocumentTransaction(startTransaction);
- for (int tries = 0;; ++tries) {
- // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown.
- bool canRetry = tries < kMaxNumStaleVersionRetries - 1;
+ command->incrementCommandsExecuted();
- if (tries > 0) {
- // Re-parse before retrying in case the process of run()-ning the
- // invocation could affect the parsed result.
- invocation = command->parse(opCtx, request);
- invariant(invocation->ns().toString() == ns,
- "unexpected change of namespace when retrying");
- }
+ if (command->shouldAffectCommandCounter()) {
+ globalOpCounters.gotCommand();
+ _shouldAffectCommandCounter = true;
+ }
- // On each try, select the latest known clusterTime as the atClusterTime for snapshot
- // reads outside of transactions.
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
- !TransactionRouter::get(opCtx) &&
- (!readConcernArgs.getArgsAtClusterTime() ||
- readConcernArgs.wasAtClusterTimeSelected())) {
- auto atClusterTime = [&] {
- auto latestKnownClusterTime = LogicalClock::get(opCtx)->getClusterTime();
- // Choose a time after the user-supplied afterClusterTime.
- auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime();
- if (afterClusterTime && *afterClusterTime > latestKnownClusterTime) {
- return afterClusterTime->asTimestamp();
- }
- return latestKnownClusterTime.asTimestamp();
- }();
- readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime);
+ return Status::OK();
+}
+
+void ParseAndRunCommand::RunAndRetry::_setup() {
+ auto opCtx = _parc->_rec->getOpCtx();
+ const auto command = _parc->_rec->getCommand();
+ const auto& request = _parc->_rec->getRequest();
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+
+ if (_tries > 1) {
+ // Re-parse before retrying in case the process of run()-ning the invocation could affect
+ // the parsed result.
+ _parc->_invocation = command->parse(opCtx, request);
+ invariant(_parc->_invocation->ns().toString() == _parc->_ns,
+ "unexpected change of namespace when retrying");
+ }
+
+ // On each try, select the latest known clusterTime as the atClusterTime for snapshot reads
+ // outside of transactions.
+ if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
+ !TransactionRouter::get(opCtx) &&
+ (!readConcernArgs.getArgsAtClusterTime() || readConcernArgs.wasAtClusterTimeSelected())) {
+ auto atClusterTime = [](OperationContext* opCtx, ReadConcernArgs& readConcernArgs) {
+ auto latestKnownClusterTime = LogicalClock::get(opCtx)->getClusterTime();
+ // Choose a time after the user-supplied afterClusterTime.
+ auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime();
+ if (afterClusterTime && *afterClusterTime > latestKnownClusterTime) {
+ return afterClusterTime->asTimestamp();
}
+ return latestKnownClusterTime.asTimestamp();
+ }(opCtx, readConcernArgs);
+ readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime);
+ }
- replyBuilder->reset();
- try {
- execCommandClient(opCtx, invocation.get(), request, replyBuilder);
+ _parc->_rec->getReplyBuilder()->reset();
+}
- auto responseBuilder = replyBuilder->getBodyBuilder();
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter.appendRecoveryToken(&responseBuilder);
- }
+Future<void> ParseAndRunCommand::RunAndRetry::_run() try {
+ auto opCtx = _parc->_rec->getOpCtx();
+ auto replyBuilder = _parc->_rec->getReplyBuilder();
- return Status::OK();
- } catch (ShardInvalidatedForTargetingException& ex) {
- auto catalogCache = Grid::get(opCtx)->catalogCache();
- catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
-
- // Retry logic specific to transactions. Throws and aborts the transaction if the
- // error cannot be retried on.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- auto abortGuard = makeGuard(
- [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
-
- if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "exhausted retries");
- throw;
- }
+ execCommandClient(opCtx, _parc->_invocation.get(), _parc->_rec->getRequest(), replyBuilder);
- // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot,
- // or shard invalidated for targeting errors.
- if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) {
- (void)catalogCache->getCollectionRoutingInfoWithRefresh(
- opCtx, ex.extraInfo<ShardInvalidatedForTargetingInfo>()->getNss());
- addContextForTransactionAbortingError(
- txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "an error from cluster data placement change");
- throw;
- }
+ auto responseBuilder = replyBuilder->getBodyBuilder();
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter.appendRecoveryToken(&responseBuilder);
+ }
- // The error is retryable, so update transaction state before retrying.
- txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
+ return Future<void>::makeReady(Status::OK());
+} catch (const DBException& ex) {
+ return ex.toStatus();
+}
- abortGuard.dismiss();
- continue;
- }
+void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status) {
+ // Retry logic specific to transactions. Throws and aborts the transaction if the error cannot
+ // be retried on.
+ auto opCtx = _parc->_rec->getOpCtx();
+ auto txnRouter = TransactionRouter::get(opCtx);
+ if (!txnRouter)
+ return;
- if (canRetry) {
- continue;
- }
- throw;
- } catch (ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) {
- const auto staleNs = [&] {
- if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
- return staleInfo->getNss();
- }
- throw;
- }();
+ auto abortGuard = makeGuard([&] { txnRouter.implicitlyAbortTransaction(opCtx, status); });
+
+ if (!_canRetry()) {
+ addContextForTransactionAbortingError(
+ txnRouter.txnIdToString(), txnRouter.getLatestStmtId(), status, "exhausted retries");
+ internalAssert(status);
+ }
+ // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, or shard
+ // invalidated for targeting errors.
+ if (ErrorCodes::isA<ErrorCategory::SnapshotError>(status)) {
+ if (!txnRouter.canContinueOnSnapshotError()) {
+ addContextForTransactionAbortingError(txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ status,
+ "a non-retryable snapshot error");
+ internalAssert(status);
+ }
+
+ // The error is retryable, so update transaction state before retrying.
+ txnRouter.onSnapshotError(opCtx, status);
+ } else {
+ invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status) ||
+ status.code() == ErrorCodes::ShardInvalidatedForTargeting ||
+ status.code() == ErrorCodes::StaleDbVersion);
+
+ if (!txnRouter.canContinueOnStaleShardOrDbError(_parc->_commandName, status)) {
+ if (status.code() == ErrorCodes::ShardInvalidatedForTargeting) {
auto catalogCache = Grid::get(opCtx)->catalogCache();
- if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
- catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
- staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId());
- } else {
- // If we don't have the stale config info and therefore don't know the shard's
- // id, we have to force all further targetting requests for the namespace to
- // block on a refresh.
- catalogCache->invalidateCollectionEntry_LINEARIZABLE(staleNs);
- }
+ (void)catalogCache->getCollectionRoutingInfoWithRefresh(
+ opCtx, status.extraInfo<ShardInvalidatedForTargetingInfo>()->getNss());
+ }
+ addContextForTransactionAbortingError(txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ status,
+ "an error from cluster data placement change");
+ internalAssert(status);
+ }
- catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
+ // The error is retryable, so update transaction state before retrying.
+ txnRouter.onStaleShardOrDbError(opCtx, _parc->_commandName, status);
+ }
- // Retry logic specific to transactions. Throws and aborts the transaction if the
- // error cannot be retried on.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- auto abortGuard = makeGuard(
- [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
+ abortGuard.dismiss();
+}
- if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "exhausted retries");
- throw;
- }
+void ParseAndRunCommand::RunAndRetry::_onShardInvalidatedForTargeting(Status& status) {
+ invariant(status.code() == ErrorCodes::ShardInvalidatedForTargeting);
- // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot,
- // or shard invalidated for targeting errors.
- if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) {
- addContextForTransactionAbortingError(
- txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "an error from cluster data placement change");
- throw;
- }
+ auto opCtx = _parc->_rec->getOpCtx();
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+ catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
- // The error is retryable, so update transaction state before retrying.
- txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
+ _checkRetryForTransaction(status);
- abortGuard.dismiss();
- continue;
- }
+ if (!_canRetry())
+ internalAssert(status);
+}
- if (canRetry) {
- continue;
- }
- throw;
- } catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) {
- // Mark database entry in cache as stale.
- Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(ex->getDb(),
- ex->getVersionWanted());
-
- // Retry logic specific to transactions. Throws and aborts the transaction if the
- // error cannot be retried on.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- auto abortGuard = makeGuard(
- [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
-
- if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "exhausted retries");
- throw;
- }
+void ParseAndRunCommand::RunAndRetry::_onNeedRetargetting(Status& status) {
+ invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status));
- // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot,
- // or shard invalidated for targeting errors.
- if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) {
- addContextForTransactionAbortingError(
- txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "an error from cluster data placement change");
- throw;
- }
+ auto staleInfo = status.extraInfo<StaleConfigInfo>();
+ if (!staleInfo)
+ internalAssert(status);
- // The error is retryable, so update transaction state before retrying.
- txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
+ auto opCtx = _parc->_rec->getOpCtx();
+ const auto staleNs = staleInfo->getNss();
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+ catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId());
- abortGuard.dismiss();
- continue;
- }
+ catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
- if (canRetry) {
- continue;
- }
- throw;
- } catch (ExceptionForCat<ErrorCategory::SnapshotError>& ex) {
- // Simple retry on any type of snapshot error.
-
- // Retry logic specific to transactions. Throws and aborts the transaction if the
- // error cannot be retried on.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- auto abortGuard = makeGuard(
- [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
-
- if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "exhausted retries");
- throw;
- }
+ _checkRetryForTransaction(status);
- // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot,
- // or shard invalidated for targeting errors.
- if (!txnRouter.canContinueOnSnapshotError()) {
- addContextForTransactionAbortingError(txnRouter.txnIdToString(),
- txnRouter.getLatestStmtId(),
- ex,
- "a non-retryable snapshot error");
- throw;
- }
+ if (!_canRetry())
+ internalAssert(status);
+}
- // The error is retryable, so update transaction state before retrying.
- txnRouter.onSnapshotError(opCtx, ex.toStatus());
+void ParseAndRunCommand::RunAndRetry::_onStaleDbVersion(Status& status) {
+ invariant(status.code() == ErrorCodes::StaleDbVersion);
+ auto opCtx = _parc->_rec->getOpCtx();
- abortGuard.dismiss();
- continue;
- } else if (!ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) {
- // Non-transaction snapshot read. The client sent readConcern: {level:
- // "snapshot", atClusterTime: T}, where T is older than
- // minSnapshotHistoryWindowInSeconds, retrying won't succeed.
- throw;
- }
+ // Mark database entry in cache as stale.
+ auto extraInfo = status.extraInfo<StaleDbRoutingVersion>();
+ invariant(extraInfo);
+ Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(extraInfo->getDb(),
+ extraInfo->getVersionWanted());
- if (canRetry) {
- continue;
- }
- throw;
- }
- MONGO_UNREACHABLE;
- }
- } catch (const DBException& e) {
- command->incrementCommandsFailed();
- LastError::get(opCtx->getClient()).setLastError(e.code(), e.reason());
- // WriteConcern error (wcCode) is set to boost::none because:
- // 1. TransientTransaction error label handling for commitTransaction command in mongos is
- // delegated to the shards. Mongos simply propagates the shard's response up to the
- // client.
- // 2. For other commands in a transaction, they shouldn't get a writeConcern error so
- // this setting doesn't apply.
- //
- // isInternalClient is set to true to suppress mongos from returning the RetryableWriteError
- // label.
- auto errorLabels = getErrorLabels(
- opCtx, osi, command->getName(), e.code(), boost::none, true /* isInternalClient */);
- errorBuilder->appendElements(errorLabels);
- throw;
+ _checkRetryForTransaction(status);
+
+ if (!_canRetry())
+ internalAssert(status);
+}
+
+void ParseAndRunCommand::RunAndRetry::_onSnapshotError(Status& status) {
+ // Simple retry on any type of snapshot error.
+ invariant(ErrorCodes::isA<ErrorCategory::SnapshotError>(status));
+
+ _checkRetryForTransaction(status);
+
+ auto opCtx = _parc->_rec->getOpCtx();
+ if (auto txnRouter = TransactionRouter::get(opCtx);
+ !txnRouter && !ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) {
+ // Non-transaction snapshot read. The client sent readConcern: {level: "snapshot",
+ // atClusterTime: T}, where T is older than minSnapshotHistoryWindowInSeconds, retrying
+ // won't succeed.
+ internalAssert(status);
}
- return Status::OK();
-} catch (const DBException& e) {
- return e.toStatus();
+
+ if (!_canRetry())
+ internalAssert(status);
+}
+
+void ParseAndRunCommand::RunInvocation::_tapOnError(const Status& status) {
+ auto opCtx = _parc->_rec->getOpCtx();
+ const auto command = _parc->_rec->getCommand();
+
+ command->incrementCommandsFailed();
+ LastError::get(opCtx->getClient()).setLastError(status.code(), status.reason());
+ // WriteConcern error (wcCode) is set to boost::none because:
+ // 1. TransientTransaction error label handling for commitTransaction command in mongos is
+ // delegated to the shards. Mongos simply propagates the shard's response up to the client.
+ // 2. For other commands in a transaction, they shouldn't get a writeConcern error so this
+ // setting doesn't apply.
+ //
+ // isInternalClient is set to true to suppress mongos from returning the RetryableWriteError
+ // label.
+ auto errorLabels = getErrorLabels(opCtx,
+ *_parc->_osi,
+ command->getName(),
+ status.code(),
+ boost::none,
+ true /* isInternalClient */);
+ _parc->_errorBuilder->appendElements(errorLabels);
+}
+
+Future<void> ParseAndRunCommand::RunInvocation::_runAndRetry() {
+ auto instance = std::make_shared<RunAndRetry>(_parc);
+ return instance->run();
+}
+
+Future<void> ParseAndRunCommand::_runInvocation() {
+ auto ri = std::make_shared<RunInvocation>(shared_from_this());
+ return ri->run();
+}
+
+Future<void> ParseAndRunCommand::RunAndRetry::run() {
+ // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown.
+ _tries++;
+
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { _setup(); })
+ .then([this, anchor = shared_from_this()] {
+ return _run()
+ .onError<ErrorCodes::ShardInvalidatedForTargeting>(
+ [this, anchor = shared_from_this()](Status status) {
+ _onShardInvalidatedForTargeting(status);
+ return run(); // Retry
+ })
+ .onErrorCategory<ErrorCategory::NeedRetargettingError>(
+ [this, anchor = shared_from_this()](Status status) {
+ _onNeedRetargetting(status);
+ return run(); // Retry
+ })
+ .onError<ErrorCodes::StaleDbVersion>(
+ [this, anchor = shared_from_this()](Status status) {
+ _onStaleDbVersion(status);
+ return run(); // Retry
+ })
+ .onErrorCategory<ErrorCategory::SnapshotError>(
+ [this, anchor = shared_from_this()](Status status) {
+ _onSnapshotError(status);
+ return run(); // Retry
+ });
+ });
+ pf.promise.emplaceValue();
+ return future;
+}
+
+Future<void> ParseAndRunCommand::RunInvocation::run() {
+ auto pf = makePromiseFuture<void>();
+ auto future =
+ std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { return _setup(); })
+ .then([this, anchor = shared_from_this()] { return _runAndRetry(); })
+ .tapError([this, anchor = shared_from_this()](Status status) { _tapOnError(status); });
+ pf.promise.emplaceValue();
+ return future;
+}
+
+Future<void> ParseAndRunCommand::run() {
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { return _prologue(); })
+ .then([this, anchor = shared_from_this()] { return _runInvocation(); })
+ .onError([this, anchor = shared_from_this()](Status status) {
+ if (status.code() == ErrorCodes::SkipCommandExecution)
+ // We've already skipped execution, so no other action is required.
+ return Status::OK();
+ return status;
+ });
+ pf.promise.emplaceValue();
+ return future;
+}
+
+/**
+ * Executes the command for the given request, and appends the result to replyBuilder
+ * and error labels, if any, to errorBuilder.
+ */
+Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<BSONObjBuilder> errorBuilder) {
+ auto instance = std::make_shared<ParseAndRunCommand>(std::move(rec), std::move(errorBuilder));
+ return instance->run();
}
} // namespace