diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/request_execution_context.h | 11 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_command_test_fixture.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 1499 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.h | 5 | ||||
-rw-r--r-- | src/mongo/s/service_entry_point_mongos.cpp | 303 |
6 files changed, 1073 insertions, 754 deletions
diff --git a/src/mongo/db/request_execution_context.h b/src/mongo/db/request_execution_context.h index b34c7e4f3b7..44f06f36369 100644 --- a/src/mongo/db/request_execution_context.h +++ b/src/mongo/db/request_execution_context.h @@ -57,22 +57,21 @@ public: RequestExecutionContext(const RequestExecutionContext&) = delete; RequestExecutionContext(RequestExecutionContext&&) = delete; - explicit RequestExecutionContext(OperationContext* opCtx) : _opCtx(opCtx) {} + RequestExecutionContext(OperationContext* opCtx, Message message) + : _opCtx(opCtx), + _message(std::move(message)), + _dbmsg(std::make_unique<DbMessage>(_message.get())) {} auto getOpCtx() const { invariant(_isOnClientThread()); return _opCtx; } - void setMessage(Message message) { - invariant(_isOnClientThread() && !_message); - _message = std::move(message); - _dbmsg = std::make_unique<DbMessage>(_message.get()); - } const Message& getMessage() const { invariant(_isOnClientThread() && _message); return _message.get(); } + DbMessage& getDbMessage() const { invariant(_isOnClientThread() && _dbmsg); return *_dbmsg.get(); diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 528b37d071e..26f9c9266b3 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -141,10 +141,7 @@ struct HandleRequest { ExecutionContext(OperationContext* opCtx, Message msg, std::unique_ptr<const ServiceEntryPointCommon::Hooks> hooks) - : RequestExecutionContext(opCtx), behaviors(std::move(hooks)) { - // It also initializes dbMessage, which is accessible via getDbMessage() - setMessage(std::move(msg)); - } + : RequestExecutionContext(opCtx, std::move(msg)), behaviors(std::move(hooks)) {} ~ExecutionContext() = default; Client& client() const { diff --git a/src/mongo/s/commands/cluster_command_test_fixture.cpp b/src/mongo/s/commands/cluster_command_test_fixture.cpp index 316fc78782f..8c8813a3b11 100644 --- a/src/mongo/s/commands/cluster_command_test_fixture.cpp +++ b/src/mongo/s/commands/cluster_command_test_fixture.cpp @@ -122,7 +122,9 @@ DbResponse ClusterCommandTestFixture::runCommand(BSONObj cmd) { auto clusterGLE = ClusterLastErrorInfo::get(client.get()); clusterGLE->newRequest(); - return Strategy::clientCommand(opCtx.get(), opMsgRequest.serialize()); + AlternativeClientRegion acr(client); + auto rec = std::make_shared<RequestExecutionContext>(opCtx.get(), opMsgRequest.serialize()); + return Strategy::clientCommand(std::move(rec)).get(); } void ClusterCommandTestFixture::runCommandSuccessful(BSONObj cmd, bool isTargeted) { diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 44a5f0ad044..2523b22204d 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -156,30 +156,28 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res /** * Invokes the given command and aborts the transaction on any non-retryable errors. */ -void invokeInTransactionRouter(OperationContext* opCtx, - const OpMsgRequest& request, - CommandInvocation* invocation, - rpc::ReplyBuilderInterface* result) { +Future<void> invokeInTransactionRouter(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<CommandInvocation> invocation) { + auto opCtx = rec->getOpCtx(); auto txnRouter = TransactionRouter::get(opCtx); invariant(txnRouter); // No-op if the transaction is not running with snapshot read concern. txnRouter.setDefaultAtClusterTime(opCtx); - try { - CommandHelpers::runCommandInvocation(opCtx, request, invocation, result); - } catch (const DBException& e) { - if (ErrorCodes::isSnapshotError(e.code()) || - ErrorCodes::isNeedRetargettingError(e.code()) || - e.code() == ErrorCodes::ShardInvalidatedForTargeting || - e.code() == ErrorCodes::StaleDbVersion) { - // Don't abort on possibly retryable errors. - throw; - } + return CommandHelpers::runCommandInvocationAsync(rec, std::move(invocation)) + .tapError([rec = std::move(rec)](Status status) { + if (auto code = status.code(); ErrorCodes::isSnapshotError(code) || + ErrorCodes::isNeedRetargettingError(code) || + code == ErrorCodes::ShardInvalidatedForTargeting || + code == ErrorCodes::StaleDbVersion) { + // Don't abort on possibly retryable errors. + return; + } - txnRouter.implicitlyAbortTransaction(opCtx, e.toStatus()); - throw; - } + auto opCtx = rec->getOpCtx(); + TransactionRouter::get(opCtx).implicitlyAbortTransaction(opCtx, status); + }); } /** @@ -187,139 +185,314 @@ void invokeInTransactionRouter(OperationContext* opCtx, */ void addContextForTransactionAbortingError(StringData txnIdAsString, StmtId latestStmtId, - DBException& ex, + Status& status, StringData reason) { - ex.addContext(str::stream() << "Transaction " << txnIdAsString << " was aborted on statement " - << latestStmtId << " due to: " << reason); + status.addContext("Transaction {} was aborted on statement {} due to: {}"_format( + txnIdAsString, latestStmtId, reason)); } -void execCommandClient(OperationContext* opCtx, - CommandInvocation* invocation, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* result) { - [&] { - const Command* c = invocation->definition(); - - const auto dbname = request.getDatabase(); - uassert(ErrorCodes::IllegalOperation, - "Can't use 'local' database through mongos", - dbname != NamespaceString::kLocalDb); - uassert( - ErrorCodes::InvalidNamespace, - str::stream() << "Invalid database name: '" << dbname << "'", - NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); +// Factory class to construct a future-chain that executes the invocation against the database. +class ExecCommandClient final : public std::enable_shared_from_this<ExecCommandClient> { +public: + ExecCommandClient(ExecCommandClient&&) = delete; + ExecCommandClient(const ExecCommandClient&) = delete; - StringMap<int> topLevelFields; - for (auto&& element : request.body) { - StringData fieldName = element.fieldNameStringData(); - if (fieldName == "help" && element.type() == Bool && element.Bool()) { - std::stringstream help; - help << "help for: " << c->getName() << " " << c->help(); - auto body = result->getBodyBuilder(); - body.append("help", help.str()); - CommandHelpers::appendSimpleCommandStatus(body, true, ""); - return; - } + ExecCommandClient(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<CommandInvocation> invocation) + : _rec(std::move(rec)), _invocation(std::move(invocation)) {} - uassert(ErrorCodes::FailedToParse, - str::stream() << "Parsed command object contains duplicate top level key: " - << fieldName, - topLevelFields[fieldName]++ == 0); - } + Future<void> run(); - try { - invocation->checkAuthorization(opCtx, request); - } catch (const DBException& e) { +private: + // Prepare the environment for running the invocation (e.g., checking authorization). + Status _prologue(); + + // Returns a future that runs the command invocation. + Future<void> _run(); + + // Any logic that must be done post command execution, unless an exception is thrown. + void _epilogue(); + + // Runs at the end of the future-chain returned by `run()` unless an exception, other than + // `ErrorCodes::SkipCommandExecution`, is thrown earlier. + void _onCompletion(); + + const std::shared_ptr<RequestExecutionContext> _rec; + const std::shared_ptr<CommandInvocation> _invocation; +}; + +Status ExecCommandClient::_prologue() { + auto opCtx = _rec->getOpCtx(); + auto result = _rec->getReplyBuilder(); + const auto& request = _rec->getRequest(); + const Command* c = _invocation->definition(); + + const auto dbname = request.getDatabase(); + uassert(ErrorCodes::IllegalOperation, + "Can't use 'local' database through mongos", + dbname != NamespaceString::kLocalDb); + uassert(ErrorCodes::InvalidNamespace, + "Invalid database name: '{}'"_format(dbname), + NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); + + StringMap<int> topLevelFields; + for (auto&& element : request.body) { + StringData fieldName = element.fieldNameStringData(); + if (fieldName == "help" && element.type() == Bool && element.Bool()) { auto body = result->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow(body, e.toStatus()); - return; + body.append("help", "help for: {} {}"_format(c->getName(), c->help())); + CommandHelpers::appendSimpleCommandStatus(body, true, ""); + return {ErrorCodes::SkipCommandExecution, "Already served help command"}; } - // attach tracking - rpc::TrackingMetadata trackingMetadata; - trackingMetadata.initWithOperName(c->getName()); - rpc::TrackingMetadata::get(opCtx) = trackingMetadata; + uassert(ErrorCodes::FailedToParse, + "Parsed command object contains duplicate top level key: {}"_format(fieldName), + topLevelFields[fieldName]++ == 0); + } - // Extract and process metadata from the command request body. - ReadPreferenceSetting::get(opCtx) = - uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.body)); - VectorClock::get(opCtx)->gossipIn(opCtx, request.body, !c->requiresAuth()); + try { + _invocation->checkAuthorization(opCtx, request); + } catch (const DBException& e) { + auto body = result->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow(body, e.toStatus()); + return {ErrorCodes::SkipCommandExecution, "Failed to check authorization"}; + } - auto txnRouter = TransactionRouter::get(opCtx); - if (txnRouter) { - invokeInTransactionRouter(opCtx, request, invocation, result); - } else { - CommandHelpers::runCommandInvocation(opCtx, request, invocation, result); - } + // attach tracking + rpc::TrackingMetadata trackingMetadata; + trackingMetadata.initWithOperName(c->getName()); + rpc::TrackingMetadata::get(opCtx) = trackingMetadata; + + // Extract and process metadata from the command request body. + ReadPreferenceSetting::get(opCtx) = + uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.body)); + VectorClock::get(opCtx)->gossipIn(opCtx, request.body, !c->requiresAuth()); + + return Status::OK(); +} - if (invocation->supportsWriteConcern()) { - failCommand.executeIf( - [&](const BSONObj& data) { - result->getBodyBuilder().append(data["writeConcernError"]); - if (data.hasField(kErrorLabelsFieldName) && - data[kErrorLabelsFieldName].type() == Array) { - auto labels = data.getObjectField(kErrorLabelsFieldName).getOwned(); - if (!labels.isEmpty()) { - result->getBodyBuilder().append(kErrorLabelsFieldName, - BSONArray(labels)); - } +Future<void> ExecCommandClient::_run() { + OperationContext* opCtx = _rec->getOpCtx(); + if (auto txnRouter = TransactionRouter::get(opCtx); txnRouter) { + return invokeInTransactionRouter(_rec, _invocation); + } else { + return CommandHelpers::runCommandInvocationAsync(_rec, _invocation); + } +} + +void ExecCommandClient::_epilogue() { + auto opCtx = _rec->getOpCtx(); + auto result = _rec->getReplyBuilder(); + if (_invocation->supportsWriteConcern()) { + failCommand.executeIf( + [&](const BSONObj& data) { + result->getBodyBuilder().append(data["writeConcernError"]); + if (data.hasField(kErrorLabelsFieldName) && + data[kErrorLabelsFieldName].type() == Array) { + auto labels = data.getObjectField(kErrorLabelsFieldName).getOwned(); + if (!labels.isEmpty()) { + result->getBodyBuilder().append(kErrorLabelsFieldName, BSONArray(labels)); } - }, - [&](const BSONObj& data) { - return CommandHelpers::shouldActivateFailCommandFailPoint( - data, invocation, opCtx->getClient()) && - data.hasField("writeConcernError"); - }); - } + } + }, + [&](const BSONObj& data) { + return CommandHelpers::shouldActivateFailCommandFailPoint( + data, _invocation.get(), opCtx->getClient()) && + data.hasField("writeConcernError"); + }); + } - auto body = result->getBodyBuilder(); + auto body = result->getBodyBuilder(); - bool ok = CommandHelpers::extractOrAppendOk(body); - if (!ok) { - c->incrementCommandsFailed(); + if (bool ok = CommandHelpers::extractOrAppendOk(body); !ok) { + const Command* c = _invocation->definition(); + c->incrementCommandsFailed(); - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter.implicitlyAbortTransaction(opCtx, - getStatusFromCommandResult(body.asTempObj())); - } + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter.implicitlyAbortTransaction(opCtx, + getStatusFromCommandResult(body.asTempObj())); } - }(); + } +} - auto body = result->getBodyBuilder(); +void ExecCommandClient::_onCompletion() { + auto opCtx = _rec->getOpCtx(); + auto body = _rec->getReplyBuilder()->getBodyBuilder(); appendRequiredFieldsToResponse(opCtx, &body); + + // TODO SERVER-49109 enable the following code-path. + /* + auto seCtx = transport::ServiceExecutorContext::get(opCtx->getClient()); + if (!seCtx) { + // We were run by a background worker. + return; + } + + if (!_invocation->isSafeForBorrowedThreads()) { + // If the last command wasn't safe for a borrowed thread, then let's move off of it. + seCtx->setThreadingModel(transport::ServiceExecutor::ThreadingModel::kDedicated); + } + */ +} + +Future<void> ExecCommandClient::run() { + auto pf = makePromiseFuture<void>(); + auto future = std::move(pf.future) + .then([this, anchor = shared_from_this()] { return _prologue(); }) + .then([this, anchor = shared_from_this()] { return _run(); }) + .then([this, anchor = shared_from_this()] { _epilogue(); }) + .onCompletion([this, anchor = shared_from_this()](Status status) { + if (!status.isOK() && status.code() != ErrorCodes::SkipCommandExecution) + return status; // Execution was interrupted due to an error. + + _onCompletion(); + return Status::OK(); + }); + pf.promise.emplaceValue(); + return future; } MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError); /** - * Executes the command for the given request, and appends the result to replyBuilder - * and error labels, if any, to errorBuilder. + * Produces a future-chain that parses the command, runs the parsed command, and captures the result + * in replyBuilder. */ -void runCommand(OperationContext* opCtx, - const OpMsgRequest& request, - const Message& m, - rpc::ReplyBuilderInterface* replyBuilder, - BSONObjBuilder* errorBuilder) { - auto const opType = m.operation(); - auto const commandName = request.getCommandName(); - auto const command = CommandHelpers::findCommand(commandName); +class ParseAndRunCommand final : public std::enable_shared_from_this<ParseAndRunCommand> { +public: + ParseAndRunCommand(const ParseAndRunCommand&) = delete; + ParseAndRunCommand(ParseAndRunCommand&&) = delete; + + ParseAndRunCommand(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<BSONObjBuilder> errorBuilder) + : _rec(std::move(rec)), + _errorBuilder(std::move(errorBuilder)), + _opType(_rec->getMessage().operation()), + _commandName(_rec->getRequest().getCommandName()) {} + + Future<void> run(); + +private: + class RunInvocation; + class RunAndRetry; + + // Prepares the environment for running the command (e.g., parsing the command to produce the + // invocation and extracting read/write concerns). + Status _prologue(); + + // Returns a future-chain that runs the parse invocation. + Future<void> _runInvocation(); + + const std::shared_ptr<RequestExecutionContext> _rec; + const std::shared_ptr<BSONObjBuilder> _errorBuilder; + const NetworkOp _opType; + const StringData _commandName; + + std::shared_ptr<CommandInvocation> _invocation; + boost::optional<std::string> _ns; + boost::optional<OperationSessionInfoFromClient> _osi; + boost::optional<WriteConcernOptions> _wc; + boost::optional<bool> _isHello; +}; + +/* + * Produces a future-chain to run the invocation and capture the result in replyBuilder. + */ +class ParseAndRunCommand::RunInvocation final + : public std::enable_shared_from_this<ParseAndRunCommand::RunInvocation> { +public: + RunInvocation(RunInvocation&&) = delete; + RunInvocation(const RunInvocation&) = delete; + + explicit RunInvocation(std::shared_ptr<ParseAndRunCommand> parc) : _parc(std::move(parc)) {} + + ~RunInvocation() { + if (!_shouldAffectCommandCounter) + return; + auto opCtx = _parc->_rec->getOpCtx(); + Grid::get(opCtx)->catalogCache()->checkAndRecordOperationBlockedByRefresh( + opCtx, mongo::LogicalOp::opCommand); + } + + Future<void> run(); + +private: + Status _setup(); + + // Returns a future-chain that runs the invocation and retries if necessary. + Future<void> _runAndRetry(); + + // Logs and updates statistics if an error occurs during `_setup()` or `_runAndRetry()`. + void _tapOnError(const Status& status); + + const std::shared_ptr<ParseAndRunCommand> _parc; + + boost::optional<RouterOperationContextSession> _routerSession; + bool _shouldAffectCommandCounter = false; +}; + +/* + * Produces a future-chain that runs the invocation and retries if necessary. + */ +class ParseAndRunCommand::RunAndRetry final + : public std::enable_shared_from_this<ParseAndRunCommand::RunAndRetry> { +public: + RunAndRetry(RunAndRetry&&) = delete; + RunAndRetry(const RunAndRetry&) = delete; + + explicit RunAndRetry(std::shared_ptr<ParseAndRunCommand> parc) : _parc(std::move(parc)) {} + + Future<void> run(); + +private: + bool _canRetry() const { + return _tries < kMaxNumStaleVersionRetries; + } + + // Sets up the environment for running the invocation, and clears the state from the last try. + void _setup(); + + Future<void> _run(); + + // Exception handler for error codes that may trigger a retry. All methods will throw `status` + // unless an attempt to retry is possible. + void _checkRetryForTransaction(Status& status); + void _onShardInvalidatedForTargeting(Status& status); + void _onNeedRetargetting(Status& status); + void _onStaleDbVersion(Status& status); + void _onSnapshotError(Status& status); + + const std::shared_ptr<ParseAndRunCommand> _parc; + + int _tries = 0; +}; + +Status ParseAndRunCommand::_prologue() { + auto opCtx = _rec->getOpCtx(); + const auto& m = _rec->getMessage(); + const auto& request = _rec->getRequest(); + auto replyBuilder = _rec->getReplyBuilder(); + + auto const command = CommandHelpers::findCommand(_commandName); if (!command) { + const std::string errorMsg = "no such cmd: {}"_format(_commandName); auto builder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - builder, - {ErrorCodes::CommandNotFound, str::stream() << "no such cmd: " << commandName}); + CommandHelpers::appendCommandStatusNoThrow(builder, + {ErrorCodes::CommandNotFound, errorMsg}); globalCommandRegistry()->incrementUnknownCommands(); appendRequiredFieldsToResponse(opCtx, &builder); - return; + return {ErrorCodes::SkipCommandExecution, errorMsg}; } - const auto isHello = command->getName() == "hello"_sd || command->getName() == "isMaster"_sd; + _rec->setCommand(command); + + _isHello.emplace(command->getName() == "hello"_sd || command->getName() == "isMaster"_sd); opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)); const auto session = opCtx->getClient()->session(); if (session) { - if (!opCtx->isExhaust() || !isHello) { - InExhaustHello::get(session.get())->setInExhaust(false, commandName); + if (!opCtx->isExhaust() || !_isHello.get()) { + InExhaustHello::get(session.get())->setInExhaust(false, _commandName); } } @@ -348,30 +521,31 @@ void runCommand(OperationContext* opCtx, opCtx->setComment(commentField.wrap()); } - std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request); - CommandInvocation::set(opCtx, invocation); + _invocation = command->parse(opCtx, request); + CommandInvocation::set(opCtx, _invocation); // Set the logical optype, command object and namespace as soon as we identify the command. If // the command does not define a fully-qualified namespace, set CurOp to the generic command // namespace db.$cmd. - std::string ns = invocation->ns().toString(); - auto nss = (request.getDatabase() == ns ? NamespaceString(ns, "$cmd") : NamespaceString(ns)); + _ns.emplace(_invocation->ns().toString()); + auto nss = + (request.getDatabase() == *_ns ? NamespaceString(*_ns, "$cmd") : NamespaceString(*_ns)); // Fill out all currentOp details. - CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, opType); + CurOp::get(opCtx)->setGenericOpRequestDetails(opCtx, nss, command, request.body, _opType); - auto osi = initializeOperationSessionInfo(opCtx, - request.body, - command->requiresAuth(), - command->attachLogicalSessionsToOpCtx(), - true); + _osi.emplace(initializeOperationSessionInfo(opCtx, + request.body, + command->requiresAuth(), + command->attachLogicalSessionsToOpCtx(), + true)); // TODO SERVER-28756: Change allowTransactionsOnConfigDatabase to true once we fix the bug // where the mongos custom write path incorrectly drops the client's txnNumber. auto allowTransactionsOnConfigDatabase = false; - validateSessionOptions(osi, command->getName(), nss, allowTransactionsOnConfigDatabase); + validateSessionOptions(*_osi, command->getName(), nss, allowTransactionsOnConfigDatabase); - auto wc = uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body)); + _wc.emplace(uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body))); Client* client = opCtx->getClient(); auto const apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command); @@ -389,519 +563,511 @@ void runCommand(OperationContext* opCtx, if (!readConcernParseStatus.isOK()) { auto builder = replyBuilder->getBodyBuilder(); CommandHelpers::appendCommandStatusNoThrow(builder, readConcernParseStatus); - return; + return {ErrorCodes::SkipCommandExecution, "Failed to parse read concern"}; } - try { - if (isHello) { - // Preload generic ClientMetadata ahead of our first hello request. After the first - // request, metaElement should always be empty. - auto metaElem = request.body[kMetadataDocumentName]; - ClientMetadata::setFromMetadata(opCtx->getClient(), metaElem); - } - - auto& apiParams = APIParameters::get(opCtx); - auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext()); - if (auto clientMetadata = ClientMetadata::get(client)) { - auto appName = clientMetadata->getApplicationName().toString(); - apiVersionMetrics.update(appName, apiParams); - } - - rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); - - CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get()); - - boost::optional<RouterOperationContextSession> routerSession; - bool startTransaction = false; - if (osi.getAutocommit()) { - routerSession.emplace(opCtx); - - auto txnRouter = TransactionRouter::get(opCtx); - invariant(txnRouter); + return Status::OK(); +} - auto txnNumber = opCtx->getTxnNumber(); - invariant(txnNumber); +Status ParseAndRunCommand::RunInvocation::_setup() { + auto invocation = _parc->_invocation; + auto opCtx = _parc->_rec->getOpCtx(); + auto command = _parc->_rec->getCommand(); + const auto& request = _parc->_rec->getRequest(); + auto replyBuilder = _parc->_rec->getReplyBuilder(); + + auto appendStatusToReplyAndSkipCommandExecution = [replyBuilder](Status status) -> Status { + auto responseBuilder = replyBuilder->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow(responseBuilder, status); + return Status(ErrorCodes::SkipCommandExecution, status.reason()); + }; + + if (_parc->_isHello.get()) { + // Preload generic ClientMetadata ahead of our first hello request. After the first + // request, metaElement should always be empty. + auto metaElem = request.body[kMetadataDocumentName]; + ClientMetadata::setFromMetadata(opCtx->getClient(), metaElem); + } - auto transactionAction = ([&] { - auto startTxnSetting = osi.getStartTransaction(); - if (startTxnSetting && *startTxnSetting) { - return TransactionRouter::TransactionActions::kStart; - } + auto& apiParams = APIParameters::get(opCtx); + auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext()); + if (auto clientMetadata = ClientMetadata::get(opCtx->getClient())) { + auto appName = clientMetadata->getApplicationName().toString(); + apiVersionMetrics.update(appName, apiParams); + } - if (command->getName() == CommitTransaction::kCommandName) { - return TransactionRouter::TransactionActions::kCommit; - } + rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); - return TransactionRouter::TransactionActions::kContinue; - })(); + CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get()); + bool startTransaction = false; + if (_parc->_osi->getAutocommit()) { + _routerSession.emplace(opCtx); - startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart); - txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction); - } + auto txnRouter = TransactionRouter::get(opCtx); + invariant(txnRouter); - bool supportsWriteConcern = invocation->supportsWriteConcern(); - if (!supportsWriteConcern && - request.body.hasField(WriteConcernOptions::kWriteConcernField)) { - // This command doesn't do writes so it should not be passed a writeConcern. - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern")); - return; - } + auto txnNumber = opCtx->getTxnNumber(); + invariant(txnNumber); - bool clientSuppliedWriteConcern = !wc.usedDefault; - bool customDefaultWriteConcernWasApplied = false; - - if (supportsWriteConcern && !clientSuppliedWriteConcern && - (!TransactionRouter::get(opCtx) || isTransactionCommand(commandName))) { - // This command supports WC, but wasn't given one - so apply the default, if there is - // one. - if (const auto wcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext()) - .getDefaultWriteConcern(opCtx)) { - wc = *wcDefault; - customDefaultWriteConcernWasApplied = true; - LOGV2_DEBUG(22766, - 2, - "Applying default writeConcern on {command} of {writeConcern}", - "Applying default writeConcern on command", - "command"_attr = request.getCommandName(), - "writeConcern"_attr = *wcDefault); + auto transactionAction = ([&] { + auto startTxnSetting = _parc->_osi->getStartTransaction(); + if (startTxnSetting && *startTxnSetting) { + return TransactionRouter::TransactionActions::kStart; } - } - if (TransactionRouter::get(opCtx)) { - validateWriteConcernForTransaction(wc, commandName); - } - - if (supportsWriteConcern) { - auto& provenance = wc.getProvenance(); - - // ClientSupplied is the only provenance that clients are allowed to pass to mongos. - if (provenance.hasSource() && !provenance.isClientSupplied()) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - Status{ErrorCodes::InvalidOptions, - "writeConcern provenance must be unset or \"{}\""_format( - ReadWriteConcernProvenance::kClientSupplied)}); - return; + if (command->getName() == CommitTransaction::kCommandName) { + return TransactionRouter::TransactionActions::kCommit; } - // If the client didn't provide a provenance, then an appropriate value needs to be - // determined. - if (!provenance.hasSource()) { - if (clientSuppliedWriteConcern) { - provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied); - } else if (customDefaultWriteConcernWasApplied) { - provenance.setSource(ReadWriteConcernProvenance::Source::customDefault); - } else { - provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault); - } - } + return TransactionRouter::TransactionActions::kContinue; + })(); - // Ensure that the WC being set on the opCtx has provenance. - invariant(wc.getProvenance().hasSource(), - str::stream() - << "unexpected unset provenance on writeConcern: " << wc.toBSON()); + startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction); + } - opCtx->setWriteConcern(wc); - } + bool supportsWriteConcern = invocation->supportsWriteConcern(); + if (!supportsWriteConcern && request.body.hasField(WriteConcernOptions::kWriteConcernField)) { + // This command doesn't do writes so it should not be passed a writeConcern. + const auto errorMsg = "Command does not support writeConcern"; + return appendStatusToReplyAndSkipCommandExecution({ErrorCodes::InvalidOptions, errorMsg}); + } - bool clientSuppliedReadConcern = readConcernArgs.isSpecified(); - bool customDefaultReadConcernWasApplied = false; - - auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel()); - if (readConcernSupport.defaultReadConcernPermit.isOK() && - (startTransaction || !TransactionRouter::get(opCtx))) { - if (readConcernArgs.isEmpty()) { - const auto rcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext()) - .getDefaultReadConcern(opCtx); - if (rcDefault) { - { - // We must obtain the client lock to set ReadConcernArgs, because it's an - // in-place reference to the object on the operation context, which may be - // concurrently used elsewhere (eg. read by currentOp). - stdx::lock_guard<Client> lk(*opCtx->getClient()); - readConcernArgs = std::move(*rcDefault); - } - customDefaultReadConcernWasApplied = true; - LOGV2_DEBUG(22767, - 2, - "Applying default readConcern on {command} of {readConcern}", - "Applying default readConcern on command", - "command"_attr = invocation->definition()->getName(), - "readConcern"_attr = *rcDefault); - // Update the readConcernSupport, since the default RC was applied. - readConcernSupport = - invocation->supportsReadConcern(readConcernArgs.getLevel()); - } - } + bool clientSuppliedWriteConcern = !_parc->_wc->usedDefault; + bool customDefaultWriteConcernWasApplied = false; + + if (supportsWriteConcern && !clientSuppliedWriteConcern && + (!TransactionRouter::get(opCtx) || isTransactionCommand(_parc->_commandName))) { + // This command supports WC, but wasn't given one - so apply the default, if there is one. + if (const auto wcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext()) + .getDefaultWriteConcern(opCtx)) { + _parc->_wc = *wcDefault; + customDefaultWriteConcernWasApplied = true; + LOGV2_DEBUG(22766, + 2, + "Applying default writeConcern on {command} of {writeConcern}", + "Applying default writeConcern on command", + "command"_attr = request.getCommandName(), + "writeConcern"_attr = *wcDefault); } + } + + if (TransactionRouter::get(opCtx)) { + validateWriteConcernForTransaction(*_parc->_wc, _parc->_commandName); + } - auto& provenance = readConcernArgs.getProvenance(); + if (supportsWriteConcern) { + auto& provenance = _parc->_wc->getProvenance(); // ClientSupplied is the only provenance that clients are allowed to pass to mongos. if (provenance.hasSource() && !provenance.isClientSupplied()) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - Status{ErrorCodes::InvalidOptions, - "readConcern provenance must be unset or \"{}\""_format( - ReadWriteConcernProvenance::kClientSupplied)}); - return; + const auto errorMsg = "writeConcern provenance must be unset or \"{}\""_format( + ReadWriteConcernProvenance::kClientSupplied); + return appendStatusToReplyAndSkipCommandExecution( + {ErrorCodes::InvalidOptions, errorMsg}); } // If the client didn't provide a provenance, then an appropriate value needs to be // determined. if (!provenance.hasSource()) { - // We must obtain the client lock to set the provenance of the opCtx's ReadConcernArgs - // as it may be concurrently read by CurrentOp. - stdx::lock_guard<Client> lk(*opCtx->getClient()); - if (clientSuppliedReadConcern) { + if (clientSuppliedWriteConcern) { provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied); - } else if (customDefaultReadConcernWasApplied) { + } else if (customDefaultWriteConcernWasApplied) { provenance.setSource(ReadWriteConcernProvenance::Source::customDefault); } else { provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault); } } - // Ensure that the RC on the opCtx has provenance. - invariant(readConcernArgs.getProvenance().hasSource(), - str::stream() << "unexpected unset provenance on readConcern: " - << readConcernArgs.toBSONInner()); - - // If we are starting a transaction, we only need to check whether the read concern is - // appropriate for running a transaction. There is no need to check whether the specific - // command supports the read concern, because all commands that are allowed to run in a - // transaction must support all applicable read concerns. - if (startTransaction) { - if (!isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - {ErrorCodes::InvalidOptions, - "The readConcern level must be either 'local' (default), 'majority' or " - "'snapshot' in order to run in a transaction"}); - return; - } - if (readConcernArgs.getArgsOpTime()) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - {ErrorCodes::InvalidOptions, - str::stream() - << "The readConcern cannot specify '" - << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction"}); - return; - } - } + // Ensure that the WC being set on the opCtx has provenance. + invariant(_parc->_wc->getProvenance().hasSource(), + "unexpected unset provenance on writeConcern: {}"_format( + _parc->_wc->toBSON().toString())); - // Otherwise, if there is a read concern present - either user-specified or the default - - // then check whether the command supports it. If there is no explicit read concern level, - // then it is implicitly "local". There is no need to check whether this is supported, - // because all commands either support "local" or upconvert the absent readConcern to a - // stronger level that they do support; e.g. $changeStream upconverts to RC "majority". - // - // Individual transaction statements are checked later on, after we've unstashed the - // transaction resources. - if (!TransactionRouter::get(opCtx) && readConcernArgs.hasLevel()) { - if (!readConcernSupport.readConcernSupport.isOK()) { - auto responseBuilder = replyBuilder->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - responseBuilder, - readConcernSupport.readConcernSupport.withContext( - str::stream() << "Command " << invocation->definition()->getName() - << " does not support " << readConcernArgs.toString())); - return; + opCtx->setWriteConcern(*_parc->_wc); + } + + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + bool clientSuppliedReadConcern = readConcernArgs.isSpecified(); + bool customDefaultReadConcernWasApplied = false; + + auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel()); + if (readConcernSupport.defaultReadConcernPermit.isOK() && + (startTransaction || !TransactionRouter::get(opCtx))) { + if (readConcernArgs.isEmpty()) { + const auto rcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext()) + .getDefaultReadConcern(opCtx); + if (rcDefault) { + { + // We must obtain the client lock to set ReadConcernArgs, because it's an + // in-place reference to the object on the operation context, which may be + // concurrently used elsewhere (eg. read by currentOp). + stdx::lock_guard<Client> lk(*opCtx->getClient()); + readConcernArgs = std::move(*rcDefault); + } + customDefaultReadConcernWasApplied = true; + LOGV2_DEBUG(22767, + 2, + "Applying default readConcern on {command} of {readConcern}", + "Applying default readConcern on command", + "command"_attr = invocation->definition()->getName(), + "readConcern"_attr = *rcDefault); + // Update the readConcernSupport, since the default RC was applied. + readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel()); } } + } - // Remember whether or not this operation is starting a transaction, in case something later - // in the execution needs to adjust its behavior based on this. - opCtx->setIsStartingMultiDocumentTransaction(startTransaction); + auto& provenance = readConcernArgs.getProvenance(); - command->incrementCommandsExecuted(); + // ClientSupplied is the only provenance that clients are allowed to pass to mongos. + if (provenance.hasSource() && !provenance.isClientSupplied()) { + const auto errorMsg = "readConcern provenance must be unset or \"{}\""_format( + ReadWriteConcernProvenance::kClientSupplied); + return appendStatusToReplyAndSkipCommandExecution({ErrorCodes::InvalidOptions, errorMsg}); + } + + // If the client didn't provide a provenance, then an appropriate value needs to be determined. + if (!provenance.hasSource()) { + // We must obtain the client lock to set the provenance of the opCtx's ReadConcernArgs as it + // may be concurrently read by CurrentOp. + stdx::lock_guard<Client> lk(*opCtx->getClient()); + if (clientSuppliedReadConcern) { + provenance.setSource(ReadWriteConcernProvenance::Source::clientSupplied); + } else if (customDefaultReadConcernWasApplied) { + provenance.setSource(ReadWriteConcernProvenance::Source::customDefault); + } else { + provenance.setSource(ReadWriteConcernProvenance::Source::implicitDefault); + } + } - auto shouldAffectCommandCounter = command->shouldAffectCommandCounter(); + // Ensure that the RC on the opCtx has provenance. + invariant(readConcernArgs.getProvenance().hasSource(), + "unexpected unset provenance on readConcern: {}"_format( + readConcernArgs.toBSONInner().toString())); + + // If we are starting a transaction, we only need to check whether the read concern is + // appropriate for running a transaction. There is no need to check whether the specific command + // supports the read concern, because all commands that are allowed to run in a transaction must + // support all applicable read concerns. + if (startTransaction) { + if (!isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())) { + const auto errorMsg = + "The readConcern level must be either 'local' (default), 'majority' or " + "'snapshot' in order to run in a transaction"; + return appendStatusToReplyAndSkipCommandExecution( + {ErrorCodes::InvalidOptions, errorMsg}); + } + if (readConcernArgs.getArgsOpTime()) { + const std::string errorMsg = + "The readConcern cannot specify '{}' in a transaction"_format( + repl::ReadConcernArgs::kAfterOpTimeFieldName); + return appendStatusToReplyAndSkipCommandExecution( + {ErrorCodes::InvalidOptions, errorMsg}); + } + } - if (shouldAffectCommandCounter) { - globalOpCounters.gotCommand(); + // Otherwise, if there is a read concern present - either user-specified or the default - then + // check whether the command supports it. If there is no explicit read concern level, then it is + // implicitly "local". There is no need to check whether this is supported, because all commands + // either support "local" or upconvert the absent readConcern to a stronger level that they do + // support; e.g. $changeStream upconverts to RC "majority". + // + // Individual transaction statements are checked later on, after we've unstashed the transaction + // resources. + if (!TransactionRouter::get(opCtx) && readConcernArgs.hasLevel()) { + if (!readConcernSupport.readConcernSupport.isOK()) { + const std::string errorMsg = "Command {} does not support {}"_format( + invocation->definition()->getName(), readConcernArgs.toString()); + return appendStatusToReplyAndSkipCommandExecution( + readConcernSupport.readConcernSupport.withContext(errorMsg)); } + } - ON_BLOCK_EXIT([opCtx, shouldAffectCommandCounter] { - if (shouldAffectCommandCounter) { - Grid::get(opCtx)->catalogCache()->checkAndRecordOperationBlockedByRefresh( - opCtx, mongo::LogicalOp::opCommand); - } - }); + // Remember whether or not this operation is starting a transaction, in case something later in + // the execution needs to adjust its behavior based on this. + opCtx->setIsStartingMultiDocumentTransaction(startTransaction); + command->incrementCommandsExecuted(); - for (int tries = 0;; ++tries) { - // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown. - bool canRetry = tries < kMaxNumStaleVersionRetries - 1; + if (command->shouldAffectCommandCounter()) { + globalOpCounters.gotCommand(); + _shouldAffectCommandCounter = true; + } - if (tries > 0) { - // Re-parse before retrying in case the process of run()-ning the - // invocation could affect the parsed result. - invocation = command->parse(opCtx, request); - invariant(invocation->ns().toString() == ns, - "unexpected change of namespace when retrying"); - } + return Status::OK(); +} - // On each try, select the latest known clusterTime as the atClusterTime for snapshot - // reads outside of transactions. - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && - !TransactionRouter::get(opCtx) && - (!readConcernArgs.getArgsAtClusterTime() || - readConcernArgs.wasAtClusterTimeSelected())) { - auto atClusterTime = [&] { - const auto latestKnownTime = VectorClock::get(opCtx)->getTime(); - // Choose a time after the user-supplied afterClusterTime. - auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime(); - if (afterClusterTime && *afterClusterTime > latestKnownTime.clusterTime()) { - return afterClusterTime->asTimestamp(); - } - return latestKnownTime.clusterTime().asTimestamp(); - }(); - readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime); +void ParseAndRunCommand::RunAndRetry::_setup() { + auto opCtx = _parc->_rec->getOpCtx(); + const auto command = _parc->_rec->getCommand(); + const auto& request = _parc->_rec->getRequest(); + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + + if (_tries > 1) { + // Re-parse before retrying in case the process of run()-ning the invocation could affect + // the parsed result. + _parc->_invocation = command->parse(opCtx, request); + invariant(_parc->_invocation->ns().toString() == _parc->_ns, + "unexpected change of namespace when retrying"); + } + + // On each try, select the latest known clusterTime as the atClusterTime for snapshot reads + // outside of transactions. + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && + !TransactionRouter::get(opCtx) && + (!readConcernArgs.getArgsAtClusterTime() || readConcernArgs.wasAtClusterTimeSelected())) { + auto atClusterTime = [](OperationContext* opCtx, ReadConcernArgs& readConcernArgs) { + const auto latestKnownTime = VectorClock::get(opCtx)->getTime(); + // Choose a time after the user-supplied afterClusterTime. + auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime(); + if (afterClusterTime && *afterClusterTime > latestKnownTime.clusterTime()) { + return afterClusterTime->asTimestamp(); } + return latestKnownTime.clusterTime().asTimestamp(); + }(opCtx, readConcernArgs); + readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime); + } - replyBuilder->reset(); - try { - execCommandClient(opCtx, invocation.get(), request, replyBuilder); + _parc->_rec->getReplyBuilder()->reset(); +} - auto responseBuilder = replyBuilder->getBodyBuilder(); - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter.appendRecoveryToken(&responseBuilder); - } +Future<void> ParseAndRunCommand::RunAndRetry::_run() { + auto ecc = std::make_shared<ExecCommandClient>(_parc->_rec, _parc->_invocation); + return ecc->run().then([rec = _parc->_rec] { + auto opCtx = rec->getOpCtx(); + auto responseBuilder = rec->getReplyBuilder()->getBodyBuilder(); + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter.appendRecoveryToken(&responseBuilder); + } + }); +} - return; - } catch (ShardInvalidatedForTargetingException& ex) { - auto catalogCache = Grid::get(opCtx)->catalogCache(); - catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); - - // Retry logic specific to transactions. Throws and aborts the transaction if the - // error cannot be retried on. - if (auto txnRouter = TransactionRouter::get(opCtx)) { - auto abortGuard = makeGuard( - [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); - - if (!canRetry) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "exhausted retries"); - throw; - } +void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status) { + // Retry logic specific to transactions. Throws and aborts the transaction if the error cannot + // be retried on. + auto opCtx = _parc->_rec->getOpCtx(); + auto txnRouter = TransactionRouter::get(opCtx); + if (!txnRouter) + return; - // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, - // or shard invalidated for targeting errors. - if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) { - (void)catalogCache->getCollectionRoutingInfoWithRefresh( - opCtx, ex.extraInfo<ShardInvalidatedForTargetingInfo>()->getNss()); - addContextForTransactionAbortingError( - txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "an error from cluster data placement change"); - throw; - } + auto abortGuard = makeGuard([&] { txnRouter.implicitlyAbortTransaction(opCtx, status); }); - // The error is retryable, so update transaction state before retrying. - txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); + if (!_canRetry()) { + addContextForTransactionAbortingError( + txnRouter.txnIdToString(), txnRouter.getLatestStmtId(), status, "exhausted retries"); + iassert(status); + } - abortGuard.dismiss(); - continue; - } + // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, or shard + // invalidated for targeting errors. + if (ErrorCodes::isA<ErrorCategory::SnapshotError>(status)) { + if (!txnRouter.canContinueOnSnapshotError()) { + addContextForTransactionAbortingError(txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + status, + "a non-retryable snapshot error"); + iassert(status); + } - if (canRetry) { - continue; - } - throw; - } catch (ExceptionForCat<ErrorCategory::NeedRetargettingError>& ex) { - const auto staleNs = [&] { - if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { - return staleInfo->getNss(); - } - throw; - }(); + // The error is retryable, so update transaction state before retrying. + txnRouter.onSnapshotError(opCtx, status); + } else { + invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status) || + status.code() == ErrorCodes::ShardInvalidatedForTargeting || + status.code() == ErrorCodes::StaleDbVersion); + if (!txnRouter.canContinueOnStaleShardOrDbError(_parc->_commandName, status)) { + if (status.code() == ErrorCodes::ShardInvalidatedForTargeting) { auto catalogCache = Grid::get(opCtx)->catalogCache(); - if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { - catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( - staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId()); - } else { - // If we don't have the stale config info and therefore don't know the shard's - // id, we have to force all further targetting requests for the namespace to - // block on a refresh. - catalogCache->invalidateCollectionEntry_LINEARIZABLE(staleNs); - } + (void)catalogCache->getCollectionRoutingInfoWithRefresh( + opCtx, status.extraInfo<ShardInvalidatedForTargetingInfo>()->getNss()); + } + addContextForTransactionAbortingError(txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + status, + "an error from cluster data placement change"); + iassert(status); + } - catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); + // The error is retryable, so update transaction state before retrying. + txnRouter.onStaleShardOrDbError(opCtx, _parc->_commandName, status); + } - // Retry logic specific to transactions. Throws and aborts the transaction if the - // error cannot be retried on. - if (auto txnRouter = TransactionRouter::get(opCtx)) { - auto abortGuard = makeGuard( - [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); + abortGuard.dismiss(); +} - if (!canRetry) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "exhausted retries"); - throw; - } +void ParseAndRunCommand::RunAndRetry::_onShardInvalidatedForTargeting(Status& status) { + invariant(status.code() == ErrorCodes::ShardInvalidatedForTargeting); - // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, - // or shard invalidated for targeting errors. - if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) { - addContextForTransactionAbortingError( - txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "an error from cluster data placement change"); - throw; - } + auto opCtx = _parc->_rec->getOpCtx(); + auto catalogCache = Grid::get(opCtx)->catalogCache(); + catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); - // The error is retryable, so update transaction state before retrying. - txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); + _checkRetryForTransaction(status); - abortGuard.dismiss(); - continue; - } + if (!_canRetry()) + iassert(status); +} - if (canRetry) { - continue; - } - throw; - } catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) { - // Mark database entry in cache as stale. - Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(ex->getDb(), - ex->getVersionWanted()); - - // Retry logic specific to transactions. Throws and aborts the transaction if the - // error cannot be retried on. - if (auto txnRouter = TransactionRouter::get(opCtx)) { - auto abortGuard = makeGuard( - [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); - - if (!canRetry) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "exhausted retries"); - throw; - } +void ParseAndRunCommand::RunAndRetry::_onNeedRetargetting(Status& status) { + invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status)); - // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, - // or shard invalidated for targeting errors. - if (!txnRouter.canContinueOnStaleShardOrDbError(commandName, ex.toStatus())) { - addContextForTransactionAbortingError( - txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "an error from cluster data placement change"); - throw; - } + auto staleInfo = status.extraInfo<StaleConfigInfo>(); + if (!staleInfo) + iassert(status); - // The error is retryable, so update transaction state before retrying. - txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); + auto opCtx = _parc->_rec->getOpCtx(); + const auto staleNs = staleInfo->getNss(); + auto catalogCache = Grid::get(opCtx)->catalogCache(); + catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( + staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId()); - abortGuard.dismiss(); - continue; - } + catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); - if (canRetry) { - continue; - } - throw; - } catch (ExceptionForCat<ErrorCategory::SnapshotError>& ex) { - // Simple retry on any type of snapshot error. - - // Retry logic specific to transactions. Throws and aborts the transaction if the - // error cannot be retried on. - if (auto txnRouter = TransactionRouter::get(opCtx)) { - auto abortGuard = makeGuard( - [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); - - if (!canRetry) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "exhausted retries"); - throw; - } + _checkRetryForTransaction(status); - // TODO SERVER-39704 Allow mongos to retry on stale shard, stale db, snapshot, - // or shard invalidated for targeting errors. - if (!txnRouter.canContinueOnSnapshotError()) { - addContextForTransactionAbortingError(txnRouter.txnIdToString(), - txnRouter.getLatestStmtId(), - ex, - "a non-retryable snapshot error"); - throw; - } + if (!_canRetry()) + iassert(status); +} - // The error is retryable, so update transaction state before retrying. - txnRouter.onSnapshotError(opCtx, ex.toStatus()); +void ParseAndRunCommand::RunAndRetry::_onStaleDbVersion(Status& status) { + invariant(status.code() == ErrorCodes::StaleDbVersion); + auto opCtx = _parc->_rec->getOpCtx(); - abortGuard.dismiss(); - continue; - } else if (!ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) { - // Non-transaction snapshot read. The client sent readConcern: {level: - // "snapshot", atClusterTime: T}, where T is older than - // minSnapshotHistoryWindowInSeconds, retrying won't succeed. - throw; - } + // Mark database entry in cache as stale. + auto extraInfo = status.extraInfo<StaleDbRoutingVersion>(); + invariant(extraInfo); + Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(extraInfo->getDb(), + extraInfo->getVersionWanted()); - if (canRetry) { - continue; - } - throw; - } - MONGO_UNREACHABLE; - } - } catch (const DBException& e) { - command->incrementCommandsFailed(); - LastError::get(opCtx->getClient()).setLastError(e.code(), e.reason()); - // WriteConcern error (wcCode) is set to boost::none because: - // 1. TransientTransaction error label handling for commitTransaction command in mongos is - // delegated to the shards. Mongos simply propagates the shard's response up to the - // client. - // 2. For other commands in a transaction, they shouldn't get a writeConcern error so - // this setting doesn't apply. - // - // isInternalClient is set to true to suppress mongos from returning the RetryableWriteError - // label. - auto errorLabels = getErrorLabels( - opCtx, osi, command->getName(), e.code(), boost::none, true /* isInternalClient */); - errorBuilder->appendElements(errorLabels); - throw; + _checkRetryForTransaction(status); + + if (!_canRetry()) + iassert(status); +} + +void ParseAndRunCommand::RunAndRetry::_onSnapshotError(Status& status) { + // Simple retry on any type of snapshot error. + invariant(ErrorCodes::isA<ErrorCategory::SnapshotError>(status)); + + _checkRetryForTransaction(status); + + auto opCtx = _parc->_rec->getOpCtx(); + if (auto txnRouter = TransactionRouter::get(opCtx); + !txnRouter && !ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) { + // Non-transaction snapshot read. The client sent readConcern: {level: "snapshot", + // atClusterTime: T}, where T is older than minSnapshotHistoryWindowInSeconds, retrying + // won't succeed. + iassert(status); } + + if (!_canRetry()) + iassert(status); +} + +void ParseAndRunCommand::RunInvocation::_tapOnError(const Status& status) { + auto opCtx = _parc->_rec->getOpCtx(); + const auto command = _parc->_rec->getCommand(); + + command->incrementCommandsFailed(); + LastError::get(opCtx->getClient()).setLastError(status.code(), status.reason()); + // WriteConcern error (wcCode) is set to boost::none because: + // 1. TransientTransaction error label handling for commitTransaction command in mongos is + // delegated to the shards. Mongos simply propagates the shard's response up to the client. + // 2. For other commands in a transaction, they shouldn't get a writeConcern error so this + // setting doesn't apply. + // + // isInternalClient is set to true to suppress mongos from returning the RetryableWriteError + // label. + auto errorLabels = getErrorLabels(opCtx, + *_parc->_osi, + command->getName(), + status.code(), + boost::none, + true /* isInternalClient */); + _parc->_errorBuilder->appendElements(errorLabels); +} + +Future<void> ParseAndRunCommand::RunInvocation::_runAndRetry() { + auto instance = std::make_shared<RunAndRetry>(_parc); + return instance->run(); +} + +Future<void> ParseAndRunCommand::_runInvocation() { + auto ri = std::make_shared<RunInvocation>(shared_from_this()); + return ri->run(); +} + +Future<void> ParseAndRunCommand::RunAndRetry::run() { + // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown. + _tries++; + + auto pf = makePromiseFuture<void>(); + auto future = std::move(pf.future) + .then([this, anchor = shared_from_this()] { _setup(); }) + .then([this, anchor = shared_from_this()] { + return _run() + .onError<ErrorCodes::ShardInvalidatedForTargeting>( + [this, anchor = shared_from_this()](Status status) { + _onShardInvalidatedForTargeting(status); + return run(); // Retry + }) + .onErrorCategory<ErrorCategory::NeedRetargettingError>( + [this, anchor = shared_from_this()](Status status) { + _onNeedRetargetting(status); + return run(); // Retry + }) + .onError<ErrorCodes::StaleDbVersion>( + [this, anchor = shared_from_this()](Status status) { + _onStaleDbVersion(status); + return run(); // Retry + }) + .onErrorCategory<ErrorCategory::SnapshotError>( + [this, anchor = shared_from_this()](Status status) { + _onSnapshotError(status); + return run(); // Retry + }); + }); + pf.promise.emplaceValue(); + return future; +} + +Future<void> ParseAndRunCommand::RunInvocation::run() { + auto pf = makePromiseFuture<void>(); + auto future = + std::move(pf.future) + .then([this, anchor = shared_from_this()] { return _setup(); }) + .then([this, anchor = shared_from_this()] { return _runAndRetry(); }) + .tapError([this, anchor = shared_from_this()](Status status) { _tapOnError(status); }); + pf.promise.emplaceValue(); + return future; +} + +Future<void> ParseAndRunCommand::run() { + auto pf = makePromiseFuture<void>(); + auto future = std::move(pf.future) + .then([this, anchor = shared_from_this()] { return _prologue(); }) + .then([this, anchor = shared_from_this()] { return _runInvocation(); }) + .onError([this, anchor = shared_from_this()](Status status) { + if (status.code() == ErrorCodes::SkipCommandExecution) + // We've already skipped execution, so no other action is required. + return Status::OK(); + return status; + }); + pf.promise.emplaceValue(); + return future; } /** - * Attaches the topology version to the response. + * Executes the command for the given request, and appends the result to replyBuilder + * and error labels, if any, to errorBuilder. */ -void attachTopologyVersionDuringShutdown(OperationContext* opCtx, - const DBException& ex, - BSONObjBuilder* errorBuilder) { - // Only attach the topology version if the mongos is in quiesce mode. If the mongos is in - // quiesce mode, this shutdown error is due to mongos rather than a shard. - auto code = ex.code(); - if (code && ErrorCodes::isA<ErrorCategory::ShutdownError>(code)) { - if (auto mongosTopCoord = MongosTopologyCoordinator::get(opCtx); - mongosTopCoord && mongosTopCoord->inQuiesceMode()) { - // Append the topology version to the response. - const auto topologyVersion = mongosTopCoord->getTopologyVersion(); - BSONObjBuilder topologyVersionBuilder(errorBuilder->subobjStart("topologyVersion")); - topologyVersion.serialize(&topologyVersionBuilder); - } - } +Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<BSONObjBuilder> errorBuilder) { + auto instance = std::make_shared<ParseAndRunCommand>(std::move(rec), std::move(errorBuilder)); + return instance->run(); } } // namespace @@ -1024,75 +1190,118 @@ DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss cursorId)}; } -DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) { - auto reply = rpc::makeReplyBuilder(rpc::protocolForMessage(m)); - BSONObjBuilder errorBuilder; +// Maintains the state required to execute client commands, and provides the interface to construct +// a future-chain that runs the command against the database. +class ClientCommand final : public std::enable_shared_from_this<ClientCommand> { +public: + ClientCommand(ClientCommand&&) = delete; + ClientCommand(const ClientCommand&) = delete; - bool propagateException = false; + explicit ClientCommand(std::shared_ptr<RequestExecutionContext> rec) + : _rec(std::move(rec)), _errorBuilder(std::make_shared<BSONObjBuilder>()) {} - try { - // Parse. - OpMsgRequest request = [&] { - try { - return rpc::opMsgRequestFromAnyProtocol(m); - } catch (const DBException& ex) { - // If this error needs to fail the connection, propagate it out. - if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) - propagateException = true; - - LOGV2_DEBUG(22769, - 1, - "Exception thrown while parsing command {error}", - "Exception thrown while parsing command", - "error"_attr = redact(ex)); - throw; - } - }(); + // Returns the future-chain that produces the response by parsing and executing the command. + Future<DbResponse> run(); - // Execute. - std::string db = request.getDatabase().toString(); - try { - LOGV2_DEBUG(22770, - 3, - "Command begin db: {db} msg id: {headerId}", - "Command begin", - "db"_attr = db, - "headerId"_attr = m.header().getId()); - runCommand(opCtx, request, m, reply.get(), &errorBuilder); +private: + void _parse(); + + Future<void> _execute(); + + // Handler for exceptions thrown during parsing and executing the command. + Future<void> _handleException(Status); + + // Extracts the command response from the replyBuilder. + DbResponse _produceResponse(); + + const std::shared_ptr<RequestExecutionContext> _rec; + const std::shared_ptr<BSONObjBuilder> _errorBuilder; + + bool _propagateException = false; +}; + +void ClientCommand::_parse() try { + const auto& msg = _rec->getMessage(); + _rec->setReplyBuilder(rpc::makeReplyBuilder(rpc::protocolForMessage(msg))); + _rec->setRequest(rpc::opMsgRequestFromAnyProtocol(msg)); +} catch (const DBException& ex) { + // If this error needs to fail the connection, propagate it out. + if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) + _propagateException = true; + + LOGV2_DEBUG(22769, + 1, + "Exception thrown while parsing command {error}", + "Exception thrown while parsing command", + "error"_attr = redact(ex)); + throw; +} + +Future<void> ClientCommand::_execute() { + LOGV2_DEBUG(22770, + 3, + "Command begin db: {db} msg id: {headerId}", + "Command begin", + "db"_attr = _rec->getRequest().getDatabase().toString(), + "headerId"_attr = _rec->getMessage().header().getId()); + + return runCommand(_rec, _errorBuilder) + .then([this, anchor = shared_from_this()] { LOGV2_DEBUG(22771, 3, "Command end db: {db} msg id: {headerId}", "Command end", - "db"_attr = db, - "headerId"_attr = m.header().getId()); - } catch (const DBException& ex) { + "db"_attr = _rec->getRequest().getDatabase().toString(), + "headerId"_attr = _rec->getMessage().header().getId()); + }) + .tapError([this, anchor = shared_from_this()](Status status) { LOGV2_DEBUG( 22772, 1, "Exception thrown while processing command on {db} msg id: {headerId} {error}", "Exception thrown while processing command", - "db"_attr = db, - "headerId"_attr = m.header().getId(), - "error"_attr = redact(ex)); + "db"_attr = _rec->getRequest().getDatabase().toString(), + "headerId"_attr = _rec->getMessage().header().getId(), + "error"_attr = redact(status)); // Record the exception in CurOp. - CurOp::get(opCtx)->debug().errInfo = ex.toStatus(); - throw; - } - } catch (const DBException& ex) { - if (propagateException) { - throw; - } + CurOp::get(_rec->getOpCtx())->debug().errInfo = std::move(status); + }); +} + +Future<void> ClientCommand::_handleException(Status status) { + if (_propagateException) { + return status; + } + + auto opCtx = _rec->getOpCtx(); + auto reply = _rec->getReplyBuilder(); - reply->reset(); - auto bob = reply->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow(bob, ex.toStatus()); - appendRequiredFieldsToResponse(opCtx, &bob); + reply->reset(); + auto bob = reply->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow(bob, status); + appendRequiredFieldsToResponse(opCtx, &bob); - attachTopologyVersionDuringShutdown(opCtx, ex, &errorBuilder); - bob.appendElements(errorBuilder.obj()); + // Only attach the topology version to the response if mongos is in quiesce mode. If mongos is + // in quiesce mode, this shutdown error is due to mongos rather than a shard. + if (ErrorCodes::isA<ErrorCategory::ShutdownError>(status)) { + if (auto mongosTopCoord = MongosTopologyCoordinator::get(opCtx); + mongosTopCoord && mongosTopCoord->inQuiesceMode()) { + // Append the topology version to the response. + const auto topologyVersion = mongosTopCoord->getTopologyVersion(); + BSONObjBuilder topologyVersionBuilder(_errorBuilder->subobjStart("topologyVersion")); + topologyVersion.serialize(&topologyVersionBuilder); + } } + bob.appendElements(_errorBuilder->obj()); + return Status::OK(); +} + +DbResponse ClientCommand::_produceResponse() { + const auto& m = _rec->getMessage(); + auto reply = _rec->getReplyBuilder(); + if (OpMsg::isFlagSet(m, OpMsg::kMoreToCome)) { return {}; // Don't reply. } @@ -1110,6 +1319,24 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) { return dbResponse; } +Future<DbResponse> ClientCommand::run() { + auto pf = makePromiseFuture<void>(); + auto future = std::move(pf.future) + .then([this, anchor = shared_from_this()] { _parse(); }) + .then([this, anchor = shared_from_this()] { return _execute(); }) + .onError([this, anchor = shared_from_this()](Status status) { + return _handleException(std::move(status)); + }) + .then([this, anchor = shared_from_this()] { return _produceResponse(); }); + pf.promise.emplaceValue(); + return future; +} + +Future<DbResponse> Strategy::clientCommand(std::shared_ptr<RequestExecutionContext> rec) { + auto instance = std::make_shared<ClientCommand>(std::move(rec)); + return instance->run(); +} + DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) { const int ntoreturn = dbm->pullInt(); uassert( @@ -1230,29 +1457,27 @@ void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) { } } -void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) { - const auto& msg = dbm->msg(); - rpc::OpMsgReplyBuilder reply; - BSONObjBuilder errorBuilder; - runCommand(opCtx, - [&]() { - switch (msg.operation()) { - case dbInsert: { - return InsertOp::parseLegacy(msg).serialize({}); - } - case dbUpdate: { - return UpdateOp::parseLegacy(msg).serialize({}); - } - case dbDelete: { - return DeleteOp::parseLegacy(msg).serialize({}); - } - default: - MONGO_UNREACHABLE; - } - }(), - msg, - &reply, - &errorBuilder); // built objects are ignored +void Strategy::writeOp(std::shared_ptr<RequestExecutionContext> rec) { + rec->setRequest([msg = rec->getMessage()]() { + switch (msg.operation()) { + case dbInsert: { + return InsertOp::parseLegacy(msg).serialize({}); + } + case dbUpdate: { + return UpdateOp::parseLegacy(msg).serialize({}); + } + case dbDelete: { + return DeleteOp::parseLegacy(msg).serialize({}); + } + default: + MONGO_UNREACHABLE; + } + }()); + + rec->setReplyBuilder(std::make_unique<rpc::OpMsgReplyBuilder>()); + runCommand(std::move(rec), + std::make_shared<BSONObjBuilder>()) // built objects are ignored + .get(); } void Strategy::explainFind(OperationContext* opCtx, diff --git a/src/mongo/s/commands/strategy.h b/src/mongo/s/commands/strategy.h index 3807a63a7f6..95f3f8c9449 100644 --- a/src/mongo/s/commands/strategy.h +++ b/src/mongo/s/commands/strategy.h @@ -33,6 +33,7 @@ #include "mongo/client/connection_string.h" #include "mongo/db/query/explain_options.h" +#include "mongo/db/request_execution_context.h" #include "mongo/s/client/shard.h" namespace mongo { @@ -74,7 +75,7 @@ public: * with the result from the operation. Doesn't send any response back and does not throw on * errors. */ - static void writeOp(OperationContext* opCtx, DbMessage* dbm); + static void writeOp(std::shared_ptr<RequestExecutionContext> rec); /** * Executes a command from either OP_QUERY or OP_MSG wire protocols. @@ -82,7 +83,7 @@ public: * Catches StaleConfigException errors and retries the command automatically after refreshing * the metadata for the failing namespace. */ - static DbResponse clientCommand(OperationContext* opCtx, const Message& message); + static Future<DbResponse> clientCommand(std::shared_ptr<RequestExecutionContext> rec); /** * Helper to run an explain of a find operation on the shards. Fills 'out' with the result of diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp index 2469445f418..d93bc314e2b 100644 --- a/src/mongo/s/service_entry_point_mongos.cpp +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -29,6 +29,8 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork +#include <memory> + #include "mongo/platform/basic.h" #include "mongo/s/service_entry_point_mongos.h" @@ -40,12 +42,12 @@ #include "mongo/db/dbmessage.h" #include "mongo/db/lasterror.h" #include "mongo/db/operation_context.h" +#include "mongo/db/request_execution_context.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" #include "mongo/rpc/message.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/strategy.h" -#include "mongo/util/scopeguard.h" namespace mongo { @@ -60,16 +62,50 @@ BSONObj buildErrReply(const DBException& ex) { } // namespace +// Allows for decomposing `handleRequest` into parts and simplifies composing the future-chain. +struct HandleRequest : public std::enable_shared_from_this<HandleRequest> { + struct OpRunnerBase; -Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, - const Message& message) noexcept try { - const int32_t msgId = message.header().getId(); - const NetworkOp op = message.operation(); + HandleRequest(OperationContext* opCtx, const Message& message) + : rec(std::make_shared<RequestExecutionContext>(opCtx, message)), + op(message.operation()), + msgId(message.header().getId()), + nsString(getNamespaceString(rec->getDbMessage())) {} + + // Prepares the environment for handling the request (e.g., setting up `ClusterLastErrorInfo`). + void setupEnvironment(); + + // Returns a future that does the heavy lifting of running client commands. + Future<DbResponse> handleRequest(); + + // Runs on successful execution of the future returned by `handleRequest`. + void onSuccess(const DbResponse&); + + // Returns a future-chain to handle the request and prepare the response. + Future<DbResponse> run(); + + static NamespaceString getNamespaceString(const DbMessage& dbmsg) { + if (!dbmsg.messageShouldHaveNs()) + return {}; + return NamespaceString(dbmsg.getns()); + } + + const std::shared_ptr<RequestExecutionContext> rec; + const NetworkOp op; + const int32_t msgId; + const NamespaceString nsString; + + boost::optional<long long> slowMsOverride; +}; + +void HandleRequest::setupEnvironment() { + using namespace fmt::literals; + auto opCtx = rec->getOpCtx(); // This exception will not be returned to the caller, but will be logged and will close the // connection uassert(ErrorCodes::IllegalOperation, - str::stream() << "Message type " << op << " is not supported.", + "Message type {} is not supported."_format(op), isSupportedRequestNetworkOp(op) && op != dbCompressed); // Decompression should be handled above us. @@ -84,116 +120,175 @@ Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCt AuthorizationSession::get(opCtx->getClient())->startRequest(opCtx); CurOp::get(opCtx)->ensureStarted(); +} - DbMessage dbm(message); +// The base for various operation runners that handle the request, and often generate a DbResponse. +struct HandleRequest::OpRunnerBase { + explicit OpRunnerBase(std::shared_ptr<HandleRequest> hr) : hr(std::move(hr)) {} + virtual ~OpRunnerBase() = default; + virtual Future<DbResponse> run() = 0; + const std::shared_ptr<HandleRequest> hr; +}; + +struct CommandOpRunner final : public HandleRequest::OpRunnerBase { + using HandleRequest::OpRunnerBase::OpRunnerBase; + Future<DbResponse> run() override { + return Strategy::clientCommand(hr->rec).tap([hr = hr](const DbResponse&) { + // The hello/isMaster commands should take kMaxAwaitTimeMs at most, log if it takes + // twice that. + if (auto command = CurOp::get(hr->rec->getOpCtx())->getCommand(); + command && (command->getName() == "hello" || command->getName() == "isMaster")) { + hr->slowMsOverride = + 2 * durationCount<Milliseconds>(SingleServerDiscoveryMonitor::kMaxAwaitTime); + } + }); + } +}; + +// The base for operations that may throw exceptions, but should not cause the connection to close. +struct OpRunner : public HandleRequest::OpRunnerBase { + using HandleRequest::OpRunnerBase::OpRunnerBase; + virtual DbResponse runOperation() = 0; + Future<DbResponse> run() override; +}; + +Future<DbResponse> OpRunner::run() try { + using namespace fmt::literals; + const NamespaceString& nss = hr->nsString; + const DbMessage& dbm = hr->rec->getDbMessage(); + + if (dbm.messageShouldHaveNs()) { + uassert(ErrorCodes::InvalidNamespace, "Invalid ns [{}]"_format(nss.ns()), nss.isValid()); + + uassert(ErrorCodes::IllegalOperation, + "Can't use 'local' database through mongos", + nss.db() != NamespaceString::kLocalDb); + } - // This is before the try block since it handles all exceptions that should not cause the - // connection to close. - if (op == dbMsg || (op == dbQuery && NamespaceString(dbm.getns()).isCommand())) { - auto dbResponse = Strategy::clientCommand(opCtx, message); + LOGV2_DEBUG(22867, + 3, + "Request::process begin ns: {namespace} msg id: {msgId} op: {operation}", + "Starting operation", + "namespace"_attr = nss, + "msgId"_attr = hr->msgId, + "operation"_attr = networkOpToString(hr->op)); - // The hello/isMaster commands should take kMaxAwaitTimeMs at most, log if it takes twice - // that. - boost::optional<long long> slowMsOverride; - if (auto command = CurOp::get(opCtx)->getCommand(); - command && (command->getName() == "hello" || command->getName() == "isMaster")) { - slowMsOverride = - 2 * durationCount<Milliseconds>(SingleServerDiscoveryMonitor::kMaxAwaitTime); - } + auto dbResponse = runOperation(); - // Mark the op as complete, populate the response length, and log it if appropriate. - CurOp::get(opCtx)->completeAndLogOperation( - opCtx, logv2::LogComponent::kCommand, dbResponse.response.size(), slowMsOverride); + LOGV2_DEBUG(22868, + 3, + "Request::process end ns: {namespace} msg id: {msgId} op: {operation}", + "Done processing operation", + "namespace"_attr = nss, + "msgId"_attr = hr->msgId, + "operation"_attr = networkOpToString(hr->op)); - return Future<DbResponse>::makeReady(std::move(dbResponse)); - } + return Future<DbResponse>::makeReady(std::move(dbResponse)); +} catch (const DBException& ex) { + LOGV2_DEBUG(22869, + 1, + "Exception thrown while processing {operation} op for {namespace}: {error}", + "Got an error while processing operation", + "operation"_attr = networkOpToString(hr->op), + "namespace"_attr = hr->nsString.ns(), + "error"_attr = ex); - NamespaceString nss; DbResponse dbResponse; - try { - if (dbm.messageShouldHaveNs()) { - nss = NamespaceString(StringData(dbm.getns())); - - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid ns [" << nss.ns() << "]", - nss.isValid()); - - uassert(ErrorCodes::IllegalOperation, - "Can't use 'local' database through mongos", - nss.db() != NamespaceString::kLocalDb); - } - - - LOGV2_DEBUG(22867, - 3, - "Request::process begin ns: {namespace} msg id: {msgId} op: {operation}", - "Starting operation", - "namespace"_attr = nss, - "msgId"_attr = msgId, - "operation"_attr = networkOpToString(op)); - - switch (op) { - case dbQuery: - // Commands are handled above through Strategy::clientCommand(). - invariant(!nss.isCommand()); - opCtx->markKillOnClientDisconnect(); - dbResponse = Strategy::queryOp(opCtx, nss, &dbm); - break; - - case dbGetMore: - dbResponse = Strategy::getMore(opCtx, nss, &dbm); - break; - - case dbKillCursors: - Strategy::killCursors(opCtx, &dbm); // No Response. - break; - - case dbInsert: - case dbUpdate: - case dbDelete: - Strategy::writeOp(opCtx, &dbm); // No Response. - break; - - default: - MONGO_UNREACHABLE; - } - - LOGV2_DEBUG(22868, - 3, - "Request::process end ns: {namespace} msg id: {msgId} op: {operation}", - "Done processing operation", - "namespace"_attr = nss, - "msgId"_attr = msgId, - "operation"_attr = networkOpToString(op)); - - } catch (const DBException& ex) { - LOGV2_DEBUG(22869, - 1, - "Exception thrown while processing {operation} op for {namespace}: {error}", - "Got an error while processing operation", - "operation"_attr = networkOpToString(op), - "namespace"_attr = nss.ns(), - "error"_attr = ex); - - if (op == dbQuery || op == dbGetMore) { - dbResponse = replyToQuery(buildErrReply(ex), ResultFlag_ErrSet); - } else { - // No Response. - } - - // We *always* populate the last error for now - LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.what()); - CurOp::get(opCtx)->debug().errInfo = ex.toStatus(); + if (hr->op == dbQuery || hr->op == dbGetMore) { + dbResponse = replyToQuery(buildErrReply(ex), ResultFlag_ErrSet); + } else { + // No Response. + } + + // We *always* populate the last error for now + auto opCtx = hr->rec->getOpCtx(); + LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.what()); + + CurOp::get(opCtx)->debug().errInfo = ex.toStatus(); + + return Future<DbResponse>::makeReady(std::move(dbResponse)); +} + +struct QueryOpRunner final : public OpRunner { + using OpRunner::OpRunner; + DbResponse runOperation() override { + // Commands are handled through CommandOpRunner and Strategy::clientCommand(). + invariant(!hr->nsString.isCommand()); + hr->rec->getOpCtx()->markKillOnClientDisconnect(); + return Strategy::queryOp(hr->rec->getOpCtx(), hr->nsString, &hr->rec->getDbMessage()); + } +}; + +struct GetMoreOpRunner final : public OpRunner { + using OpRunner::OpRunner; + DbResponse runOperation() override { + return Strategy::getMore(hr->rec->getOpCtx(), hr->nsString, &hr->rec->getDbMessage()); + } +}; + +struct KillCursorsOpRunner final : public OpRunner { + using OpRunner::OpRunner; + DbResponse runOperation() override { + Strategy::killCursors(hr->rec->getOpCtx(), &hr->rec->getDbMessage()); // No Response. + return {}; } +}; +struct WriteOpRunner final : public OpRunner { + using OpRunner::OpRunner; + DbResponse runOperation() override { + Strategy::writeOp(hr->rec); // No Response. + return {}; + } +}; + +Future<DbResponse> HandleRequest::handleRequest() { + switch (op) { + case dbQuery: + if (!nsString.isCommand()) + return std::make_unique<QueryOpRunner>(shared_from_this())->run(); + // FALLTHROUGH: it's a query containing a command + case dbMsg: + return std::make_unique<CommandOpRunner>(shared_from_this())->run(); + case dbGetMore: + return std::make_unique<GetMoreOpRunner>(shared_from_this())->run(); + case dbKillCursors: + return std::make_unique<KillCursorsOpRunner>(shared_from_this())->run(); + case dbInsert: + case dbUpdate: + case dbDelete: + return std::make_unique<WriteOpRunner>(shared_from_this())->run(); + default: + MONGO_UNREACHABLE; + } +} + +void HandleRequest::onSuccess(const DbResponse& dbResponse) { + auto opCtx = rec->getOpCtx(); // Mark the op as complete, populate the response length, and log it if appropriate. CurOp::get(opCtx)->completeAndLogOperation( - opCtx, logv2::LogComponent::kCommand, dbResponse.response.size()); + opCtx, logv2::LogComponent::kCommand, dbResponse.response.size(), slowMsOverride); +} - return Future<DbResponse>::makeReady(std::move(dbResponse)); -} catch (const DBException& e) { - LOGV2(4879803, "Failed to handle request", "error"_attr = redact(e)); - return e.toStatus(); +Future<DbResponse> HandleRequest::run() { + auto fp = makePromiseFuture<void>(); + auto future = std::move(fp.future) + .then([this, anchor = shared_from_this()] { setupEnvironment(); }) + .then([this, anchor = shared_from_this()] { return handleRequest(); }) + .tap([this, anchor = shared_from_this()](const DbResponse& dbResponse) { + onSuccess(dbResponse); + }) + .tapError([](Status status) { + LOGV2(4879803, "Failed to handle request", "error"_attr = redact(status)); + }); + fp.promise.emplaceValue(); + return future; +} + +Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, + const Message& message) noexcept { + auto hr = std::make_shared<HandleRequest>(opCtx, message); + return hr->run(); } } // namespace mongo |