diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-11-17 17:07:44 +0000 |
---|---|---|
committer | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-11-17 17:07:44 +0000 |
commit | 13bb35d34dc20273d342e52360b89e6f510d1747 (patch) | |
tree | b592cd7cc8c1edae83fe43a709945187041878c2 | |
parent | f4373b85f0f394c485cbff447312c567924ba5e3 (diff) | |
download | mongo-13bb35d34dc20273d342e52360b89e6f510d1747.tar.gz |
SERVER-51690 Futurize Mongos runCommand for async command execution
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 999 |
1 files changed, 561 insertions, 438 deletions
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index d294ee93cf7..fdca7bf7ea6 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -188,10 +188,10 @@ void invokeInTransactionRouter(OperationContext* opCtx, */ void addContextForTransactionAbortingError(StringData txnIdAsString, StmtId latestStmtId, - DBException& ex, + Status& status, StringData reason) { - ex.addContext(str::stream() << "Transaction " << txnIdAsString << " was aborted on statement " - << latestStmtId << " due to: " << reason); + status.addContext("Transaction {} was aborted on statement {} due to: {}"_format( + txnIdAsString, latestStmtId, reason)); } void execCommandClient(OperationContext* opCtx, @@ -305,28 +305,135 @@ void execCommandClient(OperationContext* opCtx, MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError); /** - * Executes the command for the given request, and appends the result to replyBuilder - * and error labels, if any, to errorBuilder. + * Produces a future-chain that parses the command, runs the parsed command, and captures the result + * in replyBuilder. */ -Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec, - std::shared_ptr<BSONObjBuilder> errorBuilder) try { - auto opCtx = rec->getOpCtx(); - const auto& request = rec->getRequest(); - const auto& m = rec->getMessage(); - auto replyBuilder = rec->getReplyBuilder(); - auto const opType = m.operation(); - auto const commandName = request.getCommandName(); - auto const command = CommandHelpers::findCommand(commandName); +class ParseAndRunCommand final : public std::enable_shared_from_this<ParseAndRunCommand> { +public: + ParseAndRunCommand(const ParseAndRunCommand&) = delete; + ParseAndRunCommand(ParseAndRunCommand&&) = delete; + + ParseAndRunCommand(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<BSONObjBuilder> errorBuilder) + : _rec(std::move(rec)), + _errorBuilder(std::move(errorBuilder)), + _opType(_rec->getMessage().operation()), + _commandName(_rec->getRequest().getCommandName()) {} + + Future<void> run(); + +private: + class RunInvocation; + class RunAndRetry; + + // Prepares the environment for running the command (e.g., parsing the command to produce the + // invocation and extracting read/write concerns). + Status _prologue(); + + // Returns a future-chain that runs the parse invocation. + Future<void> _runInvocation(); + + const std::shared_ptr<RequestExecutionContext> _rec; + const std::shared_ptr<BSONObjBuilder> _errorBuilder; + const NetworkOp _opType; + const StringData _commandName; + + std::shared_ptr<CommandInvocation> _invocation; + boost::optional<std::string> _ns; + boost::optional<OperationSessionInfoFromClient> _osi; + boost::optional<WriteConcernOptions> _wc; +}; + +/* + * Produces a future-chain to run the invocation and capture the result in replyBuilder. + */ +class ParseAndRunCommand::RunInvocation final + : public std::enable_shared_from_this<ParseAndRunCommand::RunInvocation> { +public: + RunInvocation(RunInvocation&&) = delete; + RunInvocation(const RunInvocation&) = delete; + + explicit RunInvocation(std::shared_ptr<ParseAndRunCommand> parc) : _parc(std::move(parc)) {} + + ~RunInvocation() { + if (!_shouldAffectCommandCounter) + return; + auto opCtx = _parc->_rec->getOpCtx(); + Grid::get(opCtx)->catalogCache()->checkAndRecordOperationBlockedByRefresh( + opCtx, mongo::LogicalOp::opCommand); + } + + Future<void> run(); + +private: + Status _setup(); + + // Returns a future-chain that runs the invocation and retries if necessary. + Future<void> _runAndRetry(); + + // Logs and updates statistics if an error occurs during `_setup()` or `_runAndRetry()`. + void _tapOnError(const Status& status); + + const std::shared_ptr<ParseAndRunCommand> _parc; + + boost::optional<RouterOperationContextSession> _routerSession; + bool _shouldAffectCommandCounter = false; +}; + +/* + * Produces a future-chain that runs the invocation and retries if necessary. + */ +class ParseAndRunCommand::RunAndRetry final + : public std::enable_shared_from_this<ParseAndRunCommand::RunAndRetry> { +public: + RunAndRetry(RunAndRetry&&) = delete; + RunAndRetry(const RunAndRetry&) = delete; + + explicit RunAndRetry(std::shared_ptr<ParseAndRunCommand> parc) : _parc(std::move(parc)) {} + + Future<void> run(); + +private: + bool _canRetry() const { + return _tries < kMaxNumStaleVersionRetries; + } + + // Sets up the environment for running the invocation, and clears the state from the last try. + void _setup(); + + Future<void> _run(); + + // Exception handler for error codes that may trigger a retry. All methods will throw `status` + // unless an attempt to retry is possible. + void _checkRetryForTransaction(Status& status); + void _onShardInvalidatedForTargeting(Status& status); + void _onNeedRetargetting(Status& status); + void _onStaleDbVersion(Status& status); + void _onSnapshotError(Status& status); + + const std::shared_ptr<ParseAndRunCommand> _parc; + + int _tries = 0; +}; + +Status ParseAndRunCommand::_prologue() { + auto opCtx = _rec->getOpCtx(); + const auto& m = _rec->getMessage(); + const auto& request = _rec->getRequest(); + auto replyBuilder = _rec->getReplyBuilder(); + + auto const command = CommandHelpers::findCommand(_commandName); if (!command) { + const std::string errorMsg = "no such cmd: {}"_format(_commandName); auto builder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - builder, - {ErrorCodes::CommandNotFound, str::stream() << "no such cmd: " << commandName}); + CommandHelpers::appendCommandStatusNoThrow(builder, + {ErrorCodes::CommandNotFound, errorMsg}); globalCommandRegistry()->incrementUnknownCommands(); appendRequiredFieldsToResponse(opCtx, &builder); - return Status::OK(); + return {ErrorCodes::SkipCommandExecution, errorMsg}; } + _rec->setCommand(command); opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)); const auto session = opCtx->getClient()->session(); if (session) { @@ -360,30 +467,31 @@ Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec, opCtx->setComment(commentField.wrap()); } - std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request); - CommandInvocation::set(opCtx, invocation); + _invocation = command->parse(opCtx, request); + CommandInvocation::set(opCtx, _invocation); // Set the logical optype, command object and namespace as soon as we identify the command. If // the command does not define a fully-qualified namespace, set CurOp to the generic command // namespace db.$cmd. - std::string ns = invocation->ns().toString(); - auto nss = (request.getDatabase() == ns ? NamespaceString(ns, "$cmd") : NamespaceString(ns)); + _ns.emplace(_invocation->ns().toString()); + auto nss = + (request.getDatabase() == *_ns ? NamespaceString(*_ns, "$cmd") : NamespaceString(*_ns)); // Fill out all currentOp details. - CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, opType); + CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, _opType); - auto osi = initializeOperationSessionInfo(opCtx, - request.body, - command->requiresAuth(), - command->attachLogicalSessionsToOpCtx(), - true); + _osi.emplace(initializeOperationSessionInfo(opCtx, + request.body, + command->requiresAuth(), + command->attachLogicalSessionsToOpCtx(), + true)); // TODO SERVER-28756: Change allowTransactionsOnConfigDatabase to true once we fix the bug // where the mongos custom write path incorrectly drops the client's txnNumber. auto allowTransactionsOnConfigDatabase = false; - validateSessionOptions(osi, command->getName(), nss, allowTransactionsOnConfigDatabase); + validateSessionOptions(*_osi, command->getName(), nss, allowTransactionsOnConfigDatabase); - auto wc = uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body)); + _wc.emplace(uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body))); Client* client = opCtx->getClient(); auto const apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command); @@ -401,7 +509,7 @@ Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec, if (!readConcernParseStatus.isOK()) { auto builder = replyBuilder->getBodyBuilder(); CommandHelpers::appendCommandStatusNoThrow(builder, readConcernParseStatus); - return Status::OK(); + return {ErrorCodes::SkipCommandExecution, "Failed to parse read concern"}; } auto& apiParams = APIParameters::get(opCtx); @@ -412,484 +520,499 @@ Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec, apiVersionMetrics.update(appName, apiParams); } - boost::optional<RouterOperationContextSession> routerSession; - try { - rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); - - CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get()); - bool startTransaction = false; - if (osi.getAutocommit()) { - routerSession.emplace(opCtx); - - auto txnRouter = TransactionRouter::get(opCtx); - invariant(txnRouter); + return Status::OK(); +} - auto txnNumber = opCtx->getTxnNumber(); - invariant(txnNumber); +Status ParseAndRunCommand::RunInvocation::_setup() { + auto invocation = _parc->_invocation; + auto opCtx = _parc->_rec->getOpCtx(); + auto command = _parc->_rec->getCommand(); + const auto& request = _parc->_rec->getRequest(); + auto replyBuilder = _parc->_rec->getReplyBuilder(); - auto transactionAction = ([&] { - auto startTxnSetting = osi.getStartTransaction(); - if (startTxnSetting && *startTxnSetting) { - return TransactionRouter::TransactionActions::kStart; - } + auto appendStatusToReplyAndSkipCommandExecution = [replyBuilder](Status status) -> Status { + auto responseBuilder = replyBuilder->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow(responseBuilder, status); + return Status(ErrorCodes::SkipCommandExecution, status.reason()); + }; - if (command->getName() == CommitTransaction::kCommandName) { - return TransactionRouter::TransactionActions::kCommit; - } + rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); - return TransactionRouter::TransactionActions::kContinue; - })(); + CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get()); + bool startTransaction = false; + if (_parc->_osi->getAutocommit()) { + _routerSession.emplace(opCtx); - startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart); - txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction); - } + auto txnRouter = TransactionRouter::get(opCtx); + invariant(txnRouter); - bool supportsWriteConcern = invocation->supportsWriteConcern(); - if (!supportsWriteConcern && - request.body.hasField(WriteConcernOptions::kWriteConcernField)) { - // This command doesn't do writes so it should not be passed a writeConcern. - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern")); - return Status::OK(); - } + auto txnNumber = opCtx->getTxnNumber(); + invariant(txnNumber); - bool clientSuppliedWriteConcern = !wc.usedDefault; - bool customDefaultWriteConcernWasApplied = false; - - if (supportsWriteConcern && !clientSuppliedWriteConcern && - (!TransactionRouter::get(opCtx) || isTransactionCommand(commandName))) { - // This command supports WC, but wasn't given one - so apply the default, if there is - // one. - if (const auto wcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext()) - .getDefaultWriteConcern(opCtx)) { - wc = *wcDefault; - customDefaultWriteConcernWasApplied = true; - LOGV2_DEBUG(22766, - 2, - "Applying default writeConcern on {command} of {writeConcern}", - "Applying default writeConcern on command", - "command"_attr = request.getCommandName(), - "writeConcern"_attr = *wcDefault); + auto transactionAction = ([&] { + auto startTxnSetting = _parc->_osi->getStartTransaction(); + if (startTxnSetting && *startTxnSetting) { + return TransactionRouter::TransactionActions::kStart; } - } - - if (TransactionRouter::get(opCtx)) { - validateWriteConcernForTransaction(wc, commandName); - } - if (supportsWriteConcern) { - auto& provenance = wc.getProvenance(); - - // ClientSupplied is the only provenance that clients are allowed to pass to mongos. - if (provenance.hasSource() && !provenance.isClientSupplied()) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - Status{ErrorCodes::InvalidOptions, - "writeConcern provenance must be unset or \"{}\""_format( - ReadWriteConcernProvenance::kClientSupplied)}); - return Status::OK(); + if (command->getName() == CommitTransaction::kCommandName) { + return TransactionRouter::TransactionActions::kCommit; } - // If the client didn't provide a provenance, then an appropriate value needs to be - // determined. - if (!provenance.hasSource()) { - if (clientSuppliedWriteConcern) { - provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied); - } else if (customDefaultWriteConcernWasApplied) { - provenance.setSource(ReadWriteConcernProvenance::Source::customDefault); - } else { - provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault); - } - } + return TransactionRouter::TransactionActions::kContinue; + })(); - // Ensure that the WC being set on the opCtx has provenance. - invariant(wc.getProvenance().hasSource(), - str::stream() - << "unexpected unset provenance on writeConcern: " << wc.toBSON()); + startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction); + } - opCtx->setWriteConcern(wc); - } + bool supportsWriteConcern = invocation->supportsWriteConcern(); + if (!supportsWriteConcern && request.body.hasField(WriteConcernOptions::kWriteConcernField)) { + // This command doesn't do writes so it should not be passed a writeConcern. + const auto errorMsg = "Command does not support writeConcern"; + return appendStatusToReplyAndSkipCommandExecution({ErrorCodes::InvalidOptions, errorMsg}); + } - bool clientSuppliedReadConcern = readConcernArgs.isSpecified(); - bool customDefaultReadConcernWasApplied = false; - - auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel()); - if (readConcernSupport.defaultReadConcernPermit.isOK() && - (startTransaction || !TransactionRouter::get(opCtx))) { - if (readConcernArgs.isEmpty()) { - const auto rcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext()) - .getDefaultReadConcern(opCtx); - if (rcDefault) { - { - // We must obtain the client lock to set ReadConcernArgs, because it's an - // in-place reference to the object on the operation context, which may be - // concurrently used elsewhere (eg. read by currentOp). - stdx::lock_guard<Client> lk(*opCtx->getClient()); - readConcernArgs = std::move(*rcDefault); - } - customDefaultReadConcernWasApplied = true; - LOGV2_DEBUG(22767, - 2, - "Applying default readConcern on {command} of {readConcern}", - "Applying default readConcern on command", - "command"_attr = invocation->definition()->getName(), - "readConcern"_attr = *rcDefault); - // Update the readConcernSupport, since the default RC was applied. - readConcernSupport = - invocation->supportsReadConcern(readConcernArgs.getLevel()); - } - } + bool clientSuppliedWriteConcern = !_parc->_wc->usedDefault; + bool customDefaultWriteConcernWasApplied = false; + + if (supportsWriteConcern && !clientSuppliedWriteConcern && + (!TransactionRouter::get(opCtx) || isTransactionCommand(_parc->_commandName))) { + // This command supports WC, but wasn't given one - so apply the default, if there is one. + if (const auto wcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext()) + .getDefaultWriteConcern(opCtx)) { + _parc->_wc = *wcDefault; + customDefaultWriteConcernWasApplied = true; + LOGV2_DEBUG(22766, + 2, + "Applying default writeConcern on {command} of {writeConcern}", + "Applying default writeConcern on command", + "command"_attr = request.getCommandName(), + "writeConcern"_attr = *wcDefault); } + } - auto& provenance = readConcernArgs.getProvenance(); + if (TransactionRouter::get(opCtx)) { + validateWriteConcernForTransaction(*_parc->_wc, _parc->_commandName); + } + + if (supportsWriteConcern) { + auto& provenance = _parc->_wc->getProvenance(); // ClientSupplied is the only provenance that clients are allowed to pass to mongos. if (provenance.hasSource() && !provenance.isClientSupplied()) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - Status{ErrorCodes::InvalidOptions, - "readConcern provenance must be unset or \"{}\""_format( - ReadWriteConcernProvenance::kClientSupplied)}); - return Status::OK(); + const auto errorMsg = "writeConcern provenance must be unset or \"{}\""_format( + ReadWriteConcernProvenance::kClientSupplied); + return appendStatusToReplyAndSkipCommandExecution( + {ErrorCodes::InvalidOptions, errorMsg}); } // If the client didn't provide a provenance, then an appropriate value needs to be // determined. if (!provenance.hasSource()) { - // We must obtain the client lock to set the provenance of the opCtx's ReadConcernArgs - // as it may be concurrently read by CurrentOp. - stdx::lock_guard<Client> lk(*opCtx->getClient()); - if (clientSuppliedReadConcern) { + if (clientSuppliedWriteConcern) { provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied); - } else if (customDefaultReadConcernWasApplied) { + } else if (customDefaultWriteConcernWasApplied) { provenance.setSource(ReadWriteConcernProvenance::Source::customDefault); } else { provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault); } } - // Ensure that the RC on the opCtx has provenance. - invariant(readConcernArgs.getProvenance().hasSource(), - str::stream() << "unexpected unset provenance on readConcern: " - << readConcernArgs.toBSONInner()); - - // If we are starting a transaction, we only need to check whether the read concern is - // appropriate for running a transaction. There is no need to check whether the specific - // command supports the read concern, because all commands that are allowed to run in a - // transaction must support all applicable read concerns. - if (startTransaction) { - if (!isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - {ErrorCodes::InvalidOptions, - "The readConcern level must be either 'local' (default), 'majority' or " - "'snapshot' in order to run in a transaction"}); - return Status::OK(); - } - if (readConcernArgs.getArgsOpTime()) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - {ErrorCodes::InvalidOptions, - str::stream() - << "The readConcern cannot specify '" - << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction"}); - return Status::OK(); - } - } + // Ensure that the WC being set on the opCtx has provenance. + invariant(_parc->_wc->getProvenance().hasSource(), + "unexpected unset provenance on writeConcern: {}"_format( + _parc->_wc->toBSON().toString())); + + opCtx->setWriteConcern(*_parc->_wc); + } - // Otherwise, if there is a read concern present - either user-specified or the default - - // then check whether the command supports it. If there is no explicit read concern level, - // then it is implicitly "local". There is no need to check whether this is supported, - // because all commands either support "local" or upconvert the absent readConcern to a - // stronger level that they do support; e.g. $changeStream upconverts to RC "majority". - // - // Individual transaction statements are checked later on, after we've unstashed the - // transaction resources. - if (!TransactionRouter::get(opCtx) && readConcernArgs.hasLevel()) { - if (!readConcernSupport.readConcernSupport.isOK()) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - readConcernSupport.readConcernSupport.withContext( - str::stream() << "Command " << invocation->definition()->getName() - << " does not support " << readConcernArgs.toString())); - return Status::OK(); + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + bool clientSuppliedReadConcern = readConcernArgs.isSpecified(); + bool customDefaultReadConcernWasApplied = false; + + auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel()); + if (readConcernSupport.defaultReadConcernPermit.isOK() && + (startTransaction || !TransactionRouter::get(opCtx))) { + if (readConcernArgs.isEmpty()) { + const auto rcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext()) + .getDefaultReadConcern(opCtx); + if (rcDefault) { + { + // We must obtain the client lock to set ReadConcernArgs, because it's an + // in-place reference to the object on the operation context, which may be + // concurrently used elsewhere (eg. read by currentOp). + stdx::lock_guard<Client> lk(*opCtx->getClient()); + readConcernArgs = std::move(*rcDefault); + } + customDefaultReadConcernWasApplied = true; + LOGV2_DEBUG(22767, + 2, + "Applying default readConcern on {command} of {readConcern}", + "Applying default readConcern on command", + "command"_attr = invocation->definition()->getName(), + "readConcern"_attr = *rcDefault); + // Update the readConcernSupport, since the default RC was applied. + readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel()); } } + } - // Remember whether or not this operation is starting a transaction, in case something later - // in the execution needs to adjust its behavior based on this. - opCtx->setIsStartingMultiDocumentTransaction(startTransaction); + auto& provenance = readConcernArgs.getProvenance(); - command->incrementCommandsExecuted(); + // ClientSupplied is the only provenance that clients are allowed to pass to mongos. + if (provenance.hasSource() && !provenance.isClientSupplied()) { + const auto errorMsg = "readConcern provenance must be unset or \"{}\""_format( + ReadWriteConcernProvenance::kClientSupplied); + return appendStatusToReplyAndSkipCommandExecution({ErrorCodes::InvalidOptions, errorMsg}); + } - auto shouldAffectCommandCounter = command->shouldAffectCommandCounter(); + // If the client didn't provide a provenance, then an appropriate value needs to be determined. + if (!provenance.hasSource()) { + // We must obtain the client lock to set the provenance of the opCtx's ReadConcernArgs as it + // may be concurrently read by CurrentOp. + stdx::lock_guard<Client> lk(*opCtx->getClient()); + if (clientSuppliedReadConcern) { + provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied); + } else if (customDefaultReadConcernWasApplied) { + provenance.setSource(ReadWriteConcernProvenance::Source::customDefault); + } else { + provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault); + } + } - if (shouldAffectCommandCounter) { - globalOpCounters.gotCommand(); + // Ensure that the RC on the opCtx has provenance. + invariant(readConcernArgs.getProvenance().hasSource(), + "unexpected unset provenance on readConcern: {}"_format( + readConcernArgs.toBSONInner().toString())); + + // If we are starting a transaction, we only need to check whether the read concern is + // appropriate for running a transaction. There is no need to check whether the specific command + // supports the read concern, because all commands that are allowed to run in a transaction must + // support all applicable read concerns. + if (startTransaction) { + if (!isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())) { + const auto errorMsg = + "The readConcern level must be either 'local' (default), 'majority' or " + "'snapshot' in order to run in a transaction"; + return appendStatusToReplyAndSkipCommandExecution( + {ErrorCodes::InvalidOptions, errorMsg}); } + if (readConcernArgs.getArgsOpTime()) { + const std::string errorMsg = + "The readConcern cannot specify '{}' in a transaction"_format( + repl::ReadConcernArgs::kAfterOpTimeFieldName); + return appendStatusToReplyAndSkipCommandExecution( + {ErrorCodes::InvalidOptions, errorMsg}); + } + } - ON_BLOCK_EXIT([opCtx, shouldAffectCommandCounter] { - if (shouldAffectCommandCounter) { - Grid::get(opCtx)->catalogCache()->checkAndRecordOperationBlockedByRefresh( - opCtx, mongo::LogicalOp::opCommand); - } - }); + // Otherwise, if there is a read concern present - either user-specified or the default - then + // check whether the command supports it. If there is no explicit read concern level, then it is + // implicitly "local". There is no need to check whether this is supported, because all commands + // either support "local" or upconvert the absent readConcern to a stronger level that they do + // support; e.g. $changeStream upconverts to RC "majority". + // + // Individual transaction statements are checked later on, after we've unstashed the transaction + // resources. + if (!TransactionRouter::get(opCtx) && readConcernArgs.hasLevel()) { + if (!readConcernSupport.readConcernSupport.isOK()) { + const std::string errorMsg = "Command {} does not support {}"_format( + invocation->definition()->getName(), readConcernArgs.toString()); + return appendStatusToReplyAndSkipCommandExecution( + readConcernSupport.readConcernSupport.withContext(errorMsg)); + } + } + // Remember whether or not this operation is starting a transaction, in case something later in + // the execution needs to adjust its behavior based on this. + opCtx->setIsStartingMultiDocumentTransaction(startTransaction); - for (int tries = 0;; ++tries) { - // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown. - bool canRetry = tries < kMaxNumStaleVersionRetries - 1; + command->incrementCommandsExecuted(); - if (tries > 0) { - // Re-parse before retrying in case the process of run()-ning the - // invocation could affect the parsed result. - invocation = command->parse(opCtx, request); - invariant(invocation->ns().toString() == ns, - "unexpected change of namespace when retrying"); - } + if (command->shouldAffectCommandCounter()) { + globalOpCounters.gotCommand(); + _shouldAffectCommandCounter = true; + } - // On each try, select the latest known clusterTime as the atClusterTime for snapshot - // reads outside of transactions. - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && - !TransactionRouter::get(opCtx) && - (!readConcernArgs.getArgsAtClusterTime() || - readConcernArgs.wasAtClusterTimeSelected())) { - auto atClusterTime = [&] { - auto latestKnownClusterTime = LogicalClock::get(opCtx)->getClusterTime(); - // Choose a time after the user-supplied afterClusterTime. - auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime(); - if (afterClusterTime && *afterClusterTime > latestKnownClusterTime) { - return afterClusterTime->asTimestamp(); - } - return latestKnownClusterTime.asTimestamp(); - }(); - readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime); + return Status::OK(); +} + +void ParseAndRunCommand::RunAndRetry::_setup() { + auto opCtx = _parc->_rec->getOpCtx(); + const auto command = _parc->_rec->getCommand(); + const auto& request = _parc->_rec->getRequest(); + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + + if (_tries > 1) { + // Re-parse before retrying in case the process of run()-ning the invocation could affect + // the parsed result. + _parc->_invocation = command->parse(opCtx, request); + invariant(_parc->_invocation->ns().toString() == _parc->_ns, + "unexpected change of namespace when retrying"); + } + + // On each try, select the latest known clusterTime as the atClusterTime for snapshot reads + // outside of transactions. + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && + !TransactionRouter::get(opCtx) && + (!readConcernArgs.getArgsAtClusterTime() || readConcernArgs.wasAtClusterTimeSelected())) { + auto atClusterTime = [](OperationContext* opCtx, ReadConcernArgs& readConcernArgs) { + auto latestKnownClusterTime = LogicalClock::get(opCtx)->getClusterTime(); + // Choose a time after the user-supplied afterClusterTime. + auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime(); + if (afterClusterTime && *afterClusterTime > latestKnownClusterTime) { + return afterClusterTime->asTimestamp(); } + return latestKnownClusterTime.asTimestamp(); + }(opCtx, readConcernArgs); + readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime); + } - replyBuilder->reset(); - try { - execCommandClient(opCtx, invocation.get(), request, replyBuilder); + _parc->_rec->getReplyBuilder()->reset(); +} - auto responseBuilder = replyBuilder->getBodyBuilder(); - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter.appendRecoveryToken(&responseBuilder); - } +Future<void> ParseAndRunCommand::RunAndRetry::_run() try { + auto opCtx = _parc->_rec->getOpCtx(); + auto replyBuilder = _parc->_rec->getReplyBuilder(); - return Status::OK(); - } catch (ShardInvalidatedForTargetingException& ex) { - auto catalogCache = Grid::get(opCtx)->catalogCache(); - catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); - - // Retry logic specific to transactions. Throws and aborts the transaction if the - // error cannot be retried on. - if (auto txnRouter = TransactionRouter::get(opCtx)) { - auto abortGuard = makeGuard( - [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); - - if (!canRetry) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "exhausted retries"); - throw; - } + execCommandClient(opCtx, _parc->_invocation.get(), _parc->_rec->getRequest(), replyBuilder); - // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, - // or shard invalidated for targeting errors. - if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) { - (void)catalogCache->getCollectionRoutingInfoWithRefresh( - opCtx, ex.extraInfo<ShardInvalidatedForTargetingInfo>()->getNss()); - addContextForTransactionAbortingError( - txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "an error from cluster data placement change"); - throw; - } + auto responseBuilder = replyBuilder->getBodyBuilder(); + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter.appendRecoveryToken(&responseBuilder); + } - // The error is retryable, so update transaction state before retrying. - txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); + return Future<void>::makeReady(Status::OK()); +} catch (const DBException& ex) { + return ex.toStatus(); +} - abortGuard.dismiss(); - continue; - } +void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status) { + // Retry logic specific to transactions. Throws and aborts the transaction if the error cannot + // be retried on. + auto opCtx = _parc->_rec->getOpCtx(); + auto txnRouter = TransactionRouter::get(opCtx); + if (!txnRouter) + return; - if (canRetry) { - continue; - } - throw; - } catch (ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) { - const auto staleNs = [&] { - if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { - return staleInfo->getNss(); - } - throw; - }(); + auto abortGuard = makeGuard([&] { txnRouter.implicitlyAbortTransaction(opCtx, status); }); + + if (!_canRetry()) { + addContextForTransactionAbortingError( + txnRouter.txnIdToString(), txnRouter.getLatestStmtId(), status, "exhausted retries"); + internalAssert(status); + } + // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, or shard + // invalidated for targeting errors. + if (ErrorCodes::isA<ErrorCategory::SnapshotError>(status)) { + if (!txnRouter.canContinueOnSnapshotError()) { + addContextForTransactionAbortingError(txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + status, + "a non-retryable snapshot error"); + internalAssert(status); + } + + // The error is retryable, so update transaction state before retrying. + txnRouter.onSnapshotError(opCtx, status); + } else { + invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status) || + status.code() == ErrorCodes::ShardInvalidatedForTargeting || + status.code() == ErrorCodes::StaleDbVersion); + + if (!txnRouter.canContinueOnStaleShardOrDbError(_parc->_commandName, status)) { + if (status.code() == ErrorCodes::ShardInvalidatedForTargeting) { auto catalogCache = Grid::get(opCtx)->catalogCache(); - if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { - catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( - staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId()); - } else { - // If we don't have the stale config info and therefore don't know the shard's - // id, we have to force all further targetting requests for the namespace to - // block on a refresh. - catalogCache->invalidateCollectionEntry_LINEARIZABLE(staleNs); - } + (void)catalogCache->getCollectionRoutingInfoWithRefresh( + opCtx, status.extraInfo<ShardInvalidatedForTargetingInfo>()->getNss()); + } + addContextForTransactionAbortingError(txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + status, + "an error from cluster data placement change"); + internalAssert(status); + } - catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); + // The error is retryable, so update transaction state before retrying. + txnRouter.onStaleShardOrDbError(opCtx, _parc->_commandName, status); + } - // Retry logic specific to transactions. Throws and aborts the transaction if the - // error cannot be retried on. - if (auto txnRouter = TransactionRouter::get(opCtx)) { - auto abortGuard = makeGuard( - [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); + abortGuard.dismiss(); +} - if (!canRetry) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "exhausted retries"); - throw; - } +void ParseAndRunCommand::RunAndRetry::_onShardInvalidatedForTargeting(Status& status) { + invariant(status.code() == ErrorCodes::ShardInvalidatedForTargeting); - // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, - // or shard invalidated for targeting errors. - if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) { - addContextForTransactionAbortingError( - txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "an error from cluster data placement change"); - throw; - } + auto opCtx = _parc->_rec->getOpCtx(); + auto catalogCache = Grid::get(opCtx)->catalogCache(); + catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); - // The error is retryable, so update transaction state before retrying. - txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); + _checkRetryForTransaction(status); - abortGuard.dismiss(); - continue; - } + if (!_canRetry()) + internalAssert(status); +} - if (canRetry) { - continue; - } - throw; - } catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) { - // Mark database entry in cache as stale. - Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(ex->getDb(), - ex->getVersionWanted()); - - // Retry logic specific to transactions. Throws and aborts the transaction if the - // error cannot be retried on. - if (auto txnRouter = TransactionRouter::get(opCtx)) { - auto abortGuard = makeGuard( - [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); - - if (!canRetry) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "exhausted retries"); - throw; - } +void ParseAndRunCommand::RunAndRetry::_onNeedRetargetting(Status& status) { + invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status)); - // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, - // or shard invalidated for targeting errors. - if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) { - addContextForTransactionAbortingError( - txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "an error from cluster data placement change"); - throw; - } + auto staleInfo = status.extraInfo<StaleConfigInfo>(); + if (!staleInfo) + internalAssert(status); - // The error is retryable, so update transaction state before retrying. - txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); + auto opCtx = _parc->_rec->getOpCtx(); + const auto staleNs = staleInfo->getNss(); + auto catalogCache = Grid::get(opCtx)->catalogCache(); + catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( + staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId()); - abortGuard.dismiss(); - continue; - } + catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); - if (canRetry) { - continue; - } - throw; - } catch (ExceptionForCat<ErrorCategory::SnapshotError>& ex) { - // Simple retry on any type of snapshot error. - - // Retry logic specific to transactions. Throws and aborts the transaction if the - // error cannot be retried on. - if (auto txnRouter = TransactionRouter::get(opCtx)) { - auto abortGuard = makeGuard( - [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); - - if (!canRetry) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "exhausted retries"); - throw; - } + _checkRetryForTransaction(status); - // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, - // or shard invalidated for targeting errors. - if (!txnRouter.canContinueOnSnapshotError()) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "a non-retryable snapshot error"); - throw; - } + if (!_canRetry()) + internalAssert(status); +} - // The error is retryable, so update transaction state before retrying. - txnRouter.onSnapshotError(opCtx, ex.toStatus()); +void ParseAndRunCommand::RunAndRetry::_onStaleDbVersion(Status& status) { + invariant(status.code() == ErrorCodes::StaleDbVersion); + auto opCtx = _parc->_rec->getOpCtx(); - abortGuard.dismiss(); - continue; - } else if (!ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) { - // Non-transaction snapshot read. The client sent readConcern: {level: - // "snapshot", atClusterTime: T}, where T is older than - // minSnapshotHistoryWindowInSeconds, retrying won't succeed. - throw; - } + // Mark database entry in cache as stale. + auto extraInfo = status.extraInfo<StaleDbRoutingVersion>(); + invariant(extraInfo); + Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(extraInfo->getDb(), + extraInfo->getVersionWanted()); - if (canRetry) { - continue; - } - throw; - } - MONGO_UNREACHABLE; - } - } catch (const DBException& e) { - command->incrementCommandsFailed(); - LastError::get(opCtx->getClient()).setLastError(e.code(), e.reason()); - // WriteConcern error (wcCode) is set to boost::none because: - // 1. TransientTransaction error label handling for commitTransaction command in mongos is - // delegated to the shards. Mongos simply propagates the shard's response up to the - // client. - // 2. For other commands in a transaction, they shouldn't get a writeConcern error so - // this setting doesn't apply. - // - // isInternalClient is set to true to suppress mongos from returning the RetryableWriteError - // label. - auto errorLabels = getErrorLabels( - opCtx, osi, command->getName(), e.code(), boost::none, true /* isInternalClient */); - errorBuilder->appendElements(errorLabels); - throw; + _checkRetryForTransaction(status); + + if (!_canRetry()) + internalAssert(status); +} + +void ParseAndRunCommand::RunAndRetry::_onSnapshotError(Status& status) { + // Simple retry on any type of snapshot error. + invariant(ErrorCodes::isA<ErrorCategory::SnapshotError>(status)); + + _checkRetryForTransaction(status); + + auto opCtx = _parc->_rec->getOpCtx(); + if (auto txnRouter = TransactionRouter::get(opCtx); + !txnRouter && !ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) { + // Non-transaction snapshot read. The client sent readConcern: {level: "snapshot", + // atClusterTime: T}, where T is older than minSnapshotHistoryWindowInSeconds, retrying + // won't succeed. + internalAssert(status); } - return Status::OK(); -} catch (const DBException& e) { - return e.toStatus(); + + if (!_canRetry()) + internalAssert(status); +} + +void ParseAndRunCommand::RunInvocation::_tapOnError(const Status& status) { + auto opCtx = _parc->_rec->getOpCtx(); + const auto command = _parc->_rec->getCommand(); + + command->incrementCommandsFailed(); + LastError::get(opCtx->getClient()).setLastError(status.code(), status.reason()); + // WriteConcern error (wcCode) is set to boost::none because: + // 1. TransientTransaction error label handling for commitTransaction command in mongos is + // delegated to the shards. Mongos simply propagates the shard's response up to the client. + // 2. For other commands in a transaction, they shouldn't get a writeConcern error so this + // setting doesn't apply. + // + // isInternalClient is set to true to suppress mongos from returning the RetryableWriteError + // label. + auto errorLabels = getErrorLabels(opCtx, + *_parc->_osi, + command->getName(), + status.code(), + boost::none, + true /* isInternalClient */); + _parc->_errorBuilder->appendElements(errorLabels); +} + +Future<void> ParseAndRunCommand::RunInvocation::_runAndRetry() { + auto instance = std::make_shared<RunAndRetry>(_parc); + return instance->run(); +} + +Future<void> ParseAndRunCommand::_runInvocation() { + auto ri = std::make_shared<RunInvocation>(shared_from_this()); + return ri->run(); +} + +Future<void> ParseAndRunCommand::RunAndRetry::run() { + // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown. + _tries++; + + auto pf = makePromiseFuture<void>(); + auto future = std::move(pf.future) + .then([this, anchor = shared_from_this()] { _setup(); }) + .then([this, anchor = shared_from_this()] { + return _run() + .onError<ErrorCodes::ShardInvalidatedForTargeting>( + [this, anchor = shared_from_this()](Status status) { + _onShardInvalidatedForTargeting(status); + return run(); // Retry + }) + .onErrorCategory<ErrorCategory::NeedRetargettingError>( + [this, anchor = shared_from_this()](Status status) { + _onNeedRetargetting(status); + return run(); // Retry + }) + .onError<ErrorCodes::StaleDbVersion>( + [this, anchor = shared_from_this()](Status status) { + _onStaleDbVersion(status); + return run(); // Retry + }) + .onErrorCategory<ErrorCategory::SnapshotError>( + [this, anchor = shared_from_this()](Status status) { + _onSnapshotError(status); + return run(); // Retry + }); + }); + pf.promise.emplaceValue(); + return future; +} + +Future<void> ParseAndRunCommand::RunInvocation::run() { + auto pf = makePromiseFuture<void>(); + auto future = + std::move(pf.future) + .then([this, anchor = shared_from_this()] { return _setup(); }) + .then([this, anchor = shared_from_this()] { return _runAndRetry(); }) + .tapError([this, anchor = shared_from_this()](Status status) { _tapOnError(status); }); + pf.promise.emplaceValue(); + return future; +} + +Future<void> ParseAndRunCommand::run() { + auto pf = makePromiseFuture<void>(); + auto future = std::move(pf.future) + .then([this, anchor = shared_from_this()] { return _prologue(); }) + .then([this, anchor = shared_from_this()] { return _runInvocation(); }) + .onError([this, anchor = shared_from_this()](Status status) { + if (status.code() == ErrorCodes::SkipCommandExecution) + // We've already skipped execution, so no other action is required. + return Status::OK(); + return status; + }); + pf.promise.emplaceValue(); + return future; +} + +/** + * Executes the command for the given request, and appends the result to replyBuilder + * and error labels, if any, to errorBuilder. + */ +Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<BSONObjBuilder> errorBuilder) { + auto instance = std::make_shared<ParseAndRunCommand>(std::move(rec), std::move(errorBuilder)); + return instance->run(); } } // namespace |