From 38f350478add12941539ec6f44034d5ba4443265 Mon Sep 17 00:00:00 2001 From: Amirsaman Memaripour Date: Sat, 26 Sep 2020 01:43:36 +0000 Subject: SERVER-49107 Futurize and refactor execCommandDatabase() --- src/mongo/base/error_codes.yml | 2 + src/mongo/db/service_entry_point_common.cpp | 719 +++++++++++---------- src/mongo/db/service_entry_point_common.h | 2 +- src/mongo/db/service_entry_point_mongod.cpp | 6 +- .../embedded/service_entry_point_embedded.cpp | 2 +- 5 files changed, 388 insertions(+), 343 deletions(-) diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index f788660569e..5d38573d087 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -401,6 +401,8 @@ error_codes: - {code: 327, name: NoSuchTenantMigration} + - {code: 328, name: SkipCommandExecution} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 085595c229b..d586c76c845 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -150,6 +150,10 @@ struct HandleRequest { return *getOpCtx()->getClient(); } + auto session() const { + return client().session(); + } + NetworkOp op() const { return getMessage().operation(); } @@ -983,390 +987,437 @@ bool runCommandImpl(OperationContext* opCtx, return ok; } -/** - * Executes a command after stripping metadata, performing authorization checks, - * handling audit impersonation, and (potentially) setting maintenance mode. This method - * also checks that the command is permissible to run on the node given its current - * replication state. All the logic here is independent of any particular command; any - * functionality relevant to a specific command should be confined to its run() method. - */ -void execCommandDatabase(OperationContext* opCtx, - Command* command, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* replyBuilder, - const ServiceEntryPointCommon::Hooks& behaviors) { - CommandHelpers::uassertShouldAttemptParse(opCtx, command, request); - BSONObjBuilder extraFieldsBuilder; - auto startOperationTime = getClientOperationTime(opCtx); +class ExecCommandDatabase final { +public: + explicit ExecCommandDatabase(std::shared_ptr execContext) + : _isInternalClient(execContext->session() && + execContext->session()->getTags() & + transport::Session::kInternalClient), + _execContext(std::move(execContext)) {} + + /** + * Returns a future that executes a command after stripping metadata, performing authorization + * checks, handling audit impersonation, and (potentially) setting maintenance mode. The future + * also checks that the command is permissible to run on the node given its current replication + * state. All the logic here is independent of any particular command; any functionality + * relevant to a specific command should be confined to its run() method. + */ + static Future run(std::shared_ptr execContext) { + auto opCtx = execContext->getOpCtx(); + auto command = execContext->getCommand(); + auto& request = execContext->getRequest(); + + auto instance = std::make_shared(std::move(execContext)); + + CommandHelpers::uassertShouldAttemptParse(opCtx, command, request); + instance->_startOperationTime = getClientOperationTime(opCtx); + + instance->_invocation = command->parse(opCtx, request); + CommandInvocation::set(opCtx, instance->_invocation); + + return instance->_initiateCommand() + .then([instance] { return instance->_commandExec(); }) + .onError([instance = std::move(instance)](Status status) mutable { + return instance->_handleFailure(std::move(status)); + }); + } - std::shared_ptr invocation = command->parse(opCtx, request); - CommandInvocation::set(opCtx, invocation); +private: + // Any logic, such as authorization and auditing, that must precede execution of the command. + Future _initiateCommand(); - OperationSessionInfoFromClient sessionOptions; + // Returns the future chain that executes the parsed command against the database. + Future _commandExec(); - const auto isInternalClient = opCtx->getClient()->session() && - (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient); + // Any error-handling logic that must be performed if the command initiation/execution fails. + void _handleFailure(Status status); - try { - const auto apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command); - Client* client = opCtx->getClient(); + // An indication on whether the command is initiated by an internal client. + const bool _isInternalClient; - { - stdx::lock_guard lk(*client); - CurOp::get(opCtx)->setCommand_inlock(command); - APIParameters::get(opCtx) = APIParameters::fromClient(apiParamsFromClient); - } + std::shared_ptr const _execContext; - auto& apiParams = APIParameters::get(opCtx); - auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext()); - const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata(); - if (clientMetadata) { - auto appName = clientMetadata.get().getApplicationName().toString(); - apiVersionMetrics.update(appName, apiParams); - } + // The following allows `_initiateCommand`, `_commandExec`, and `_handleFailure` to share + // execution state without concerning the lifetime of these variables. In particular, `_scoped` + // is a scoped variable that once created, lives as long as its owner (i.e., an instance of + // `ExecCommandDatabase`) lives. + BSONObjBuilder _extraFieldsBuilder; + std::shared_ptr _invocation; + LogicalTime _startOperationTime; + OperationSessionInfoFromClient _sessionOptions; + std::unique_ptr _scoped; +}; - sleepMillisAfterCommandExecutionBegins.execute([&](const BSONObj& data) { - auto numMillis = data["millis"].numberInt(); - auto commands = data["commands"].Obj().getFieldNames>(); - // Only sleep for one of the specified commands. - if (commands.find(command->getName()) != commands.end()) { - mongo::sleepmillis(numMillis); - } - }); +Future ExecCommandDatabase::_initiateCommand() try { + auto opCtx = _execContext->getOpCtx(); + auto& request = _execContext->getRequest(); + auto command = _execContext->getCommand(); + auto replyBuilder = _execContext->getReplyBuilder(); + + const auto apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command); + Client* client = opCtx->getClient(); - rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); - rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName()); + { + stdx::lock_guard lk(*client); + CurOp::get(opCtx)->setCommand_inlock(command); + APIParameters::get(opCtx) = APIParameters::fromClient(apiParamsFromClient); + } - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + auto& apiParams = APIParameters::get(opCtx); + auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext()); + const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata(); + if (clientMetadata) { + auto appName = clientMetadata.get().getApplicationName().toString(); + apiVersionMetrics.update(appName, apiParams); + } - sessionOptions = initializeOperationSessionInfo( - opCtx, - request.body, - command->requiresAuth(), - command->attachLogicalSessionsToOpCtx(), - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet); + sleepMillisAfterCommandExecutionBegins.execute([&](const BSONObj& data) { + auto numMillis = data["millis"].numberInt(); + auto commands = data["commands"].Obj().getFieldNames>(); + // Only sleep for one of the specified commands. + if (commands.find(command->getName()) != commands.end()) { + mongo::sleepmillis(numMillis); + } + }); - CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get()); + rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); + rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName()); - const auto dbname = request.getDatabase().toString(); - uassert( - ErrorCodes::InvalidNamespace, - str::stream() << "Invalid database name: '" << dbname << "'", + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + + _sessionOptions = initializeOperationSessionInfo(opCtx, + request.body, + command->requiresAuth(), + command->attachLogicalSessionsToOpCtx(), + replCoord->getReplicationMode() == + repl::ReplicationCoordinator::modeReplSet); + + CommandHelpers::evaluateFailCommandFailPoint(opCtx, _invocation.get()); + + const auto dbname = request.getDatabase().toString(); + uassert(ErrorCodes::InvalidNamespace, + fmt::format("Invalid database name: '{}'", dbname), NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); - const auto allowTransactionsOnConfigDatabase = - (serverGlobalParams.clusterRole == ClusterRole::ConfigServer || - serverGlobalParams.clusterRole == ClusterRole::ShardServer); - - validateSessionOptions(sessionOptions, - command->getName(), - invocation->ns(), - allowTransactionsOnConfigDatabase); - - std::unique_ptr mmSetter; - - BSONElement cmdOptionMaxTimeMSField; - BSONElement maxTimeMSOpOnlyField; - BSONElement allowImplicitCollectionCreationField; - BSONElement helpField; - - StringMap topLevelFields; - for (auto&& element : request.body) { - StringData fieldName = element.fieldNameStringData(); - if (fieldName == QueryRequest::cmdOptionMaxTimeMS) { - cmdOptionMaxTimeMSField = element; - } else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) { - uassert(ErrorCodes::InvalidOptions, - "Can not specify maxTimeMSOpOnly for non internal clients", - isInternalClient); - maxTimeMSOpOnlyField = element; - } else if (fieldName == "allowImplicitCollectionCreation") { - allowImplicitCollectionCreationField = element; - } else if (fieldName == CommandHelpers::kHelpFieldName) { - helpField = element; - } else if (fieldName == "comment") { - opCtx->setComment(element.wrap()); - } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) { - uasserted(ErrorCodes::InvalidOptions, - "no such command option $maxTimeMs; use maxTimeMS instead"); - } + const auto allowTransactionsOnConfigDatabase = + (serverGlobalParams.clusterRole == ClusterRole::ConfigServer || + serverGlobalParams.clusterRole == ClusterRole::ShardServer); - uassert(ErrorCodes::FailedToParse, - str::stream() << "Parsed command object contains duplicate top level key: " - << fieldName, - topLevelFields[fieldName]++ == 0); - } + validateSessionOptions( + _sessionOptions, command->getName(), _invocation->ns(), allowTransactionsOnConfigDatabase); - if (CommandHelpers::isHelpRequest(helpField)) { - CurOp::get(opCtx)->ensureStarted(); - // We disable last-error for help requests due to SERVER-11492, because config - // servers use help requests to determine which commands are database writes, and so - // must be forwarded to all config servers. - LastError::get(opCtx->getClient()).disable(); - Command::generateHelpResponse(opCtx, replyBuilder, *command); - return; + std::unique_ptr mmSetter; + + BSONElement cmdOptionMaxTimeMSField; + BSONElement maxTimeMSOpOnlyField; + BSONElement allowImplicitCollectionCreationField; + BSONElement helpField; + + StringMap topLevelFields; + for (auto&& element : request.body) { + StringData fieldName = element.fieldNameStringData(); + if (fieldName == QueryRequest::cmdOptionMaxTimeMS) { + cmdOptionMaxTimeMSField = element; + } else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) { + uassert(ErrorCodes::InvalidOptions, + "Can not specify maxTimeMSOpOnly for non internal clients", + _isInternalClient); + maxTimeMSOpOnlyField = element; + } else if (fieldName == "allowImplicitCollectionCreation") { + allowImplicitCollectionCreationField = element; + } else if (fieldName == CommandHelpers::kHelpFieldName) { + helpField = element; + } else if (fieldName == "comment") { + opCtx->setComment(element.wrap()); + } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) { + uasserted(ErrorCodes::InvalidOptions, + "no such command option $maxTimeMs; use maxTimeMS instead"); } - ImpersonationSessionGuard guard(opCtx); - invocation->checkAuthorization(opCtx, request); - - const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); - - if (!opCtx->getClient()->isInDirectClient() && - !MONGO_unlikely(skipCheckingForNotMasterInCommandDispatch.shouldFail())) { - const bool inMultiDocumentTransaction = (sessionOptions.getAutocommit() == false); - auto allowed = command->secondaryAllowed(opCtx->getServiceContext()); - bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways; - bool couldHaveOptedIn = - allowed == Command::AllowedOnSecondary::kOptIn && !inMultiDocumentTransaction; - bool optedIn = - couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary(); - bool canRunHere = commandCanRunHere(opCtx, dbname, command, inMultiDocumentTransaction); - if (!canRunHere && couldHaveOptedIn) { - uasserted(ErrorCodes::NotPrimaryNoSecondaryOk, "not master and slaveOk=false"); - } + uassert(ErrorCodes::FailedToParse, + str::stream() << "Parsed command object contains duplicate top level key: " + << fieldName, + topLevelFields[fieldName]++ == 0); + } - if (MONGO_unlikely(respondWithNotPrimaryInCommandDispatch.shouldFail())) { - uassert(ErrorCodes::NotWritablePrimary, "not primary", canRunHere); - } else { - uassert(ErrorCodes::NotWritablePrimary, "not master", canRunHere); - } + if (CommandHelpers::isHelpRequest(helpField)) { + CurOp::get(opCtx)->ensureStarted(); + // We disable last-error for help requests due to SERVER-11492, because config servers use + // help requests to determine which commands are database writes, and so must be forwarded + // to all config servers. + LastError::get(opCtx->getClient()).disable(); + Command::generateHelpResponse(opCtx, replyBuilder, *command); + return Status(ErrorCodes::SkipCommandExecution, + "Skipping command execution for help request"); + } - if (!command->maintenanceOk() && - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) && - !replCoord->getMemberState().secondary()) { - - uassert(ErrorCodes::NotPrimaryOrSecondary, - "node is recovering", - !replCoord->getMemberState().recovering()); - uassert(ErrorCodes::NotPrimaryOrSecondary, - "node is not in primary or recovering state", - replCoord->getMemberState().primary()); - // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode - uassert(ErrorCodes::NotPrimaryOrSecondary, - "node is in drain mode", - optedIn || alwaysAllowed); - } + ImpersonationSessionGuard guard(opCtx); + _invocation->checkAuthorization(opCtx, request); + + const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); + + if (!opCtx->getClient()->isInDirectClient() && + !MONGO_unlikely(skipCheckingForNotMasterInCommandDispatch.shouldFail())) { + const bool inMultiDocumentTransaction = (_sessionOptions.getAutocommit() == false); + auto allowed = command->secondaryAllowed(opCtx->getServiceContext()); + bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways; + bool couldHaveOptedIn = + allowed == Command::AllowedOnSecondary::kOptIn && !inMultiDocumentTransaction; + bool optedIn = couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary(); + bool canRunHere = commandCanRunHere(opCtx, dbname, command, inMultiDocumentTransaction); + if (!canRunHere && couldHaveOptedIn) { + uasserted(ErrorCodes::NotPrimaryNoSecondaryOk, "not master and slaveOk=false"); } - if (command->adminOnly()) { - LOGV2_DEBUG(21961, - 2, - "Admin only command: {command}", - "Admin only command", - "command"_attr = request.getCommandName()); + if (MONGO_unlikely(respondWithNotPrimaryInCommandDispatch.shouldFail())) { + uassert(ErrorCodes::NotWritablePrimary, "not primary", canRunHere); + } else { + uassert(ErrorCodes::NotWritablePrimary, "not master", canRunHere); } - if (command->maintenanceMode()) { - mmSetter.reset(new MaintenanceModeSetter(opCtx)); + if (!command->maintenanceOk() && + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && + !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) && + !replCoord->getMemberState().secondary()) { + + uassert(ErrorCodes::NotPrimaryOrSecondary, + "node is recovering", + !replCoord->getMemberState().recovering()); + uassert(ErrorCodes::NotPrimaryOrSecondary, + "node is not in primary or recovering state", + replCoord->getMemberState().primary()); + // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode + uassert(ErrorCodes::NotPrimaryOrSecondary, + "node is in drain mode", + optedIn || alwaysAllowed); } + } - if (command->shouldAffectCommandCounter()) { - OpCounters* opCounters = &globalOpCounters; - opCounters->gotCommand(); - } + if (command->adminOnly()) { + LOGV2_DEBUG(21961, + 2, + "Admin only command: {command}", + "Admin only command", + "command"_attr = request.getCommandName()); + } - // Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation - // on the OperationContext. The 'maxTimeMS' option unfortunately has a different meaning - // for a getMore command, where it is used to communicate the maximum time to wait for - // new inserts on tailable cursors, not as a deadline for the operation. - // TODO SERVER-34277 Remove the special handling for maxTimeMS for getMores. This will - // require introducing a new 'max await time' parameter for getMore, and eventually - // banning maxTimeMS altogether on a getMore command. - int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField)); - int maxTimeMSOpOnly = uassertStatusOK(QueryRequest::parseMaxTimeMS(maxTimeMSOpOnlyField)); - - // The "hello" command should not inherit the deadline from the user op it is operating as a - // part of as that can interfere with replica set monitoring and host selection. - bool ignoreMaxTimeMSOpOnly = command->getName() == "hello"_sd; - - if ((maxTimeMS > 0 || maxTimeMSOpOnly > 0) && - command->getLogicalOp() != LogicalOp::opGetMore) { - uassert(40119, - "Illegal attempt to set operation deadline within DBDirectClient", - !opCtx->getClient()->isInDirectClient()); - if (!ignoreMaxTimeMSOpOnly && maxTimeMSOpOnly > 0 && - (maxTimeMS == 0 || maxTimeMSOpOnly < maxTimeMS)) { - opCtx->storeMaxTimeMS(Milliseconds{maxTimeMS}); - opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMSOpOnly}, - ErrorCodes::MaxTimeMSExpired); - } else if (maxTimeMS > 0) { - opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired); - } + if (command->maintenanceMode()) { + mmSetter.reset(new MaintenanceModeSetter(opCtx)); + } + + if (command->shouldAffectCommandCounter()) { + OpCounters* opCounters = &globalOpCounters; + opCounters->gotCommand(); + } + + // Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation on the + // OperationContext. The 'maxTimeMS' option unfortunately has a different meaning for a getMore + // command, where it is used to communicate the maximum time to wait for new inserts on tailable + // cursors, not as a deadline for the operation. + // TODO SERVER-34277 Remove the special handling for maxTimeMS for getMores. This will require + // introducing a new 'max await time' parameter for getMore, and eventually banning maxTimeMS + // altogether on a getMore command. + int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField)); + int maxTimeMSOpOnly = uassertStatusOK(QueryRequest::parseMaxTimeMS(maxTimeMSOpOnlyField)); + + // The "hello" command should not inherit the deadline from the user op it is operating as a + // part of as that can interfere with replica set monitoring and host selection. + bool ignoreMaxTimeMSOpOnly = command->getName() == "hello"_sd; + + if ((maxTimeMS > 0 || maxTimeMSOpOnly > 0) && command->getLogicalOp() != LogicalOp::opGetMore) { + uassert(40119, + "Illegal attempt to set operation deadline within DBDirectClient", + !opCtx->getClient()->isInDirectClient()); + if (!ignoreMaxTimeMSOpOnly && maxTimeMSOpOnly > 0 && + (maxTimeMS == 0 || maxTimeMSOpOnly < maxTimeMS)) { + opCtx->storeMaxTimeMS(Milliseconds{maxTimeMS}); + opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMSOpOnly}, + ErrorCodes::MaxTimeMSExpired); + } else if (maxTimeMS > 0) { + opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired); } + } - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - // If the parent operation runs in a transaction, we don't override the read concern. - auto skipReadConcern = - opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction(); - bool startTransaction = static_cast(sessionOptions.getStartTransaction()); - if (!skipReadConcern) { - auto newReadConcernArgs = uassertStatusOK(_extractReadConcern( - opCtx, invocation.get(), request.body, startTransaction, isInternalClient)); + // If the parent operation runs in a transaction, we don't override the read concern. + auto skipReadConcern = + opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction(); + bool startTransaction = static_cast(_sessionOptions.getStartTransaction()); + if (!skipReadConcern) { + auto newReadConcernArgs = uassertStatusOK(_extractReadConcern( + opCtx, _invocation.get(), request.body, startTransaction, _isInternalClient)); - // Ensure that the RC being set on the opCtx has provenance. - invariant(newReadConcernArgs.getProvenance().hasSource(), - str::stream() << "unexpected unset provenance on readConcern: " - << newReadConcernArgs.toBSONInner()); + // Ensure that the RC being set on the opCtx has provenance. + invariant(newReadConcernArgs.getProvenance().hasSource(), + str::stream() << "unexpected unset provenance on readConcern: " + << newReadConcernArgs.toBSONInner()); - uassert(ErrorCodes::InvalidOptions, - "Only the first command in a transaction may specify a readConcern", - startTransaction || !opCtx->inMultiDocumentTransaction() || - newReadConcernArgs.isEmpty()); - - { - // We must obtain the client lock to set the ReadConcernArgs on the operation - // context as it may be concurrently read by CurrentOp. - stdx::lock_guard lk(*opCtx->getClient()); - readConcernArgs = std::move(newReadConcernArgs); - } - } + uassert(ErrorCodes::InvalidOptions, + "Only the first command in a transaction may specify a readConcern", + startTransaction || !opCtx->inMultiDocumentTransaction() || + newReadConcernArgs.isEmpty()); - if (startTransaction) { - opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true); - opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + { + // We must obtain the client lock to set the ReadConcernArgs on the operation context as + // it may be concurrently read by CurrentOp. + stdx::lock_guard lk(*opCtx->getClient()); + readConcernArgs = std::move(newReadConcernArgs); } + } - if (opCtx->inMultiDocumentTransaction() && !startTransaction) { - uassert(4937700, - "API parameters are only allowed in the first command of a multi-document " - "transaction", - !APIParameters::get(opCtx).getParamsPassed()); - } + if (startTransaction) { + opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true); + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + } - // Remember whether or not this operation is starting a transaction, in case something - // later in the execution needs to adjust its behavior based on this. - opCtx->setIsStartingMultiDocumentTransaction(startTransaction); + if (opCtx->inMultiDocumentTransaction() && !startTransaction) { + uassert(4937700, + "API parameters are only allowed in the first command of a multi-document " + "transaction", + !APIParameters::get(opCtx).getParamsPassed()); + } - auto& oss = OperationShardingState::get(opCtx); + // Remember whether or not this operation is starting a transaction, in case something later in + // the execution needs to adjust its behavior based on this. + opCtx->setIsStartingMultiDocumentTransaction(startTransaction); - if (!opCtx->getClient()->isInDirectClient() && - readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && - (iAmPrimary || - (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { - oss.initializeClientRoutingVersionsFromCommand(invocation->ns(), request.body); + auto& oss = OperationShardingState::get(opCtx); - auto const shardingState = ShardingState::get(opCtx); - if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) { - uassertStatusOK(shardingState->canAcceptShardedCommands()); - } + if (!opCtx->getClient()->isInDirectClient() && + readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && + (iAmPrimary || (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { + oss.initializeClientRoutingVersionsFromCommand(_invocation->ns(), request.body); - behaviors.advanceConfigOpTimeFromRequestMetadata(opCtx); + auto const shardingState = ShardingState::get(opCtx); + if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) { + uassertStatusOK(shardingState->canAcceptShardedCommands()); } - oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField); - auto scoped = behaviors.scopedOperationCompletionShardingActions(opCtx); + _execContext->behaviors->advanceConfigOpTimeFromRequestMetadata(opCtx); + } - // This may trigger the maxTimeAlwaysTimeOut failpoint. - auto status = opCtx->checkForInterruptNoAssert(); + oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField); + _scoped = _execContext->behaviors->scopedOperationCompletionShardingActions(opCtx); - // We still proceed if the primary stepped down, but accept other kinds of - // interruptions. We defer to individual commands to allow themselves to be - // interruptible by stepdowns, since commands like 'voteRequest' should conversely - // continue executing. - if (status != ErrorCodes::PrimarySteppedDown && - status != ErrorCodes::InterruptedDueToReplStateChange) { - uassertStatusOK(status); - } + // This may trigger the maxTimeAlwaysTimeOut failpoint. + auto status = opCtx->checkForInterruptNoAssert(); + // We still proceed if the primary stepped down, but accept other kinds of interruptions. We + // defer to individual commands to allow themselves to be interruptible by stepdowns, since + // commands like 'voteRequest' should conversely continue executing. + if (status != ErrorCodes::PrimarySteppedDown && + status != ErrorCodes::InterruptedDueToReplStateChange) { + uassertStatusOK(status); + } - CurOp::get(opCtx)->ensureStarted(); + CurOp::get(opCtx)->ensureStarted(); - command->incrementCommandsExecuted(); - - if (shouldLog(logv2::LogComponent::kTracking, logv2::LogSeverity::Debug(1)) && - rpc::TrackingMetadata::get(opCtx).getParentOperId()) { - LOGV2_DEBUG_OPTIONS(4615605, - 1, - {logv2::LogComponent::kTracking}, - "Command metadata: {trackingMetadata}", - "Command metadata", - "trackingMetadata"_attr = rpc::TrackingMetadata::get(opCtx)); - rpc::TrackingMetadata::get(opCtx).setIsLogged(true); - } + command->incrementCommandsExecuted(); - behaviors.waitForReadConcern(opCtx, invocation.get(), request); - behaviors.setPrepareConflictBehaviorForReadConcern(opCtx, invocation.get()); + if (shouldLog(logv2::LogComponent::kTracking, logv2::LogSeverity::Debug(1)) && + rpc::TrackingMetadata::get(opCtx).getParentOperId()) { + LOGV2_DEBUG_OPTIONS(4615605, + 1, + {logv2::LogComponent::kTracking}, + "Command metadata: {trackingMetadata}", + "Command metadata", + "trackingMetadata"_attr = rpc::TrackingMetadata::get(opCtx)); + rpc::TrackingMetadata::get(opCtx).setIsLogged(true); + } - try { - if (!runCommandImpl(opCtx, - invocation.get(), - request, - replyBuilder, - startOperationTime, - behaviors, - &extraFieldsBuilder, - sessionOptions)) { - command->incrementCommandsFailed(); - } - } catch (const DBException& e) { - command->incrementCommandsFailed(); - if (e.code() == ErrorCodes::Unauthorized) { - CommandHelpers::auditLogAuthEvent(opCtx, invocation.get(), request, e.code()); - } - throw; - } - } catch (const DBException& e) { - behaviors.handleException(e, opCtx); + _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request); + _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get()); + return Status::OK(); +} catch (const DBException& ex) { + return ex.toStatus(); +} - // Append the error labels for transient transaction errors. - auto response = extraFieldsBuilder.asTempObj(); - boost::optional wcCode; - if (response.hasField("writeConcernError")) { - wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt()); - } - appendErrorLabelsAndTopologyVersion(opCtx, - &extraFieldsBuilder, - sessionOptions, - command->getName(), - e.code(), - wcCode, - isInternalClient); - - BSONObjBuilder metadataBob; - behaviors.appendReplyMetadata(opCtx, request, &metadataBob); - - // The read concern may not have yet been placed on the operation context, so attempt to - // parse it here, so if it is valid it can be used to compute the proper operationTime. - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (readConcernArgs.isEmpty()) { - auto readConcernArgsStatus = _extractReadConcern(opCtx, - invocation.get(), - request.body, - false /*startTransaction*/, - isInternalClient); - if (readConcernArgsStatus.isOK()) { - // We must obtain the client lock to set the ReadConcernArgs on the operation - // context as it may be concurrently read by CurrentOp. - stdx::lock_guard lk(*opCtx->getClient()); - readConcernArgs = readConcernArgsStatus.getValue(); - } - } - appendClusterAndOperationTime(opCtx, &extraFieldsBuilder, &metadataBob, startOperationTime); +Future ExecCommandDatabase::_commandExec() try { + if (!runCommandImpl(_execContext->getOpCtx(), + _invocation.get(), + _execContext->getRequest(), + _execContext->getReplyBuilder(), + _startOperationTime, + *_execContext->behaviors, + &_extraFieldsBuilder, + _sessionOptions)) { + _execContext->getCommand()->incrementCommandsFailed(); + } + return Status::OK(); +} catch (const DBException& ex) { + _execContext->getCommand()->incrementCommandsFailed(); + if (ex.code() == ErrorCodes::Unauthorized) { + CommandHelpers::auditLogAuthEvent( + _execContext->getOpCtx(), _invocation.get(), _execContext->getRequest(), ex.code()); + } + return ex.toStatus(); +} - LOGV2_DEBUG(21962, - 1, - "Assertion while executing command '{command}' on database '{db}' with " - "arguments '{commandArgs}': {error}", - "Assertion while executing command", - "command"_attr = request.getCommandName(), - "db"_attr = request.getDatabase(), - "commandArgs"_attr = redact( - ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)), - "error"_attr = redact(e.toString())); +void ExecCommandDatabase::_handleFailure(Status status) { + // Absorb the exception as the command execution has already been skipped. + if (status.code() == ErrorCodes::SkipCommandExecution) + return; - generateErrorResponse( - opCtx, replyBuilder, e.toStatus(), metadataBob.obj(), extraFieldsBuilder.obj()); + auto opCtx = _execContext->getOpCtx(); + auto& request = _execContext->getRequest(); + auto command = _execContext->getCommand(); + auto replyBuilder = _execContext->getReplyBuilder(); + const auto& behaviors = *_execContext->behaviors; - if (ErrorCodes::isA(e.code())) { - // Rethrow the exception to the top to signal that the client connection should be - // closed. - throw; + behaviors.handleException(status, opCtx); + + // Append the error labels for transient transaction errors. + auto response = _extraFieldsBuilder.asTempObj(); + boost::optional wcCode; + if (response.hasField("writeConcernError")) { + wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt()); + } + appendErrorLabelsAndTopologyVersion(opCtx, + &_extraFieldsBuilder, + _sessionOptions, + command->getName(), + status.code(), + wcCode, + _isInternalClient); + + BSONObjBuilder metadataBob; + behaviors.appendReplyMetadata(opCtx, request, &metadataBob); + + // The read concern may not have yet been placed on the operation context, so attempt to parse + // it here, so if it is valid it can be used to compute the proper operationTime. + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + if (readConcernArgs.isEmpty()) { + auto readConcernArgsStatus = _extractReadConcern( + opCtx, _invocation.get(), request.body, false /*startTransaction*/, _isInternalClient); + if (readConcernArgsStatus.isOK()) { + // We must obtain the client lock to set the ReadConcernArgs on the operation context as + // it may be concurrently read by CurrentOp. + stdx::lock_guard lk(*opCtx->getClient()); + readConcernArgs = readConcernArgsStatus.getValue(); } } + appendClusterAndOperationTime(opCtx, &_extraFieldsBuilder, &metadataBob, _startOperationTime); + + LOGV2_DEBUG(21962, + 1, + "Assertion while executing command '{command}' on database '{db}' with " + "arguments '{commandArgs}': {error}", + "Assertion while executing command", + "command"_attr = request.getCommandName(), + "db"_attr = request.getDatabase(), + "commandArgs"_attr = redact( + ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)), + "error"_attr = redact(status.toString())); + + generateErrorResponse( + opCtx, replyBuilder, status, metadataBob.obj(), _extraFieldsBuilder.obj()); + + if (ErrorCodes::isA(status.code())) { + // Rethrow the exception to the top to signal that the client connection should be closed. + internalAssert(status); + } } /** @@ -1386,7 +1437,6 @@ void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) { curop->setNS_inlock(nss.ns()); } - Future parseCommand(std::shared_ptr execContext) try { execContext->setRequest(rpc::opMsgRequestFromAnyProtocol(execContext->getMessage())); return Status::OK(); @@ -1467,14 +1517,7 @@ Future executeCommand(std::shared_ptr exe return Status::OK(); }) - .then([execContext]() -> Future { - execCommandDatabase(execContext->getOpCtx(), - execContext->getCommand(), - execContext->getRequest(), - execContext->getReplyBuilder(), - *execContext->behaviors); - return Status::OK(); - }) + .then([execContext] { return ExecCommandDatabase::run(std::move(execContext)); }) .tapError([execContext](Status status) { LOGV2_DEBUG( 21966, @@ -1586,7 +1629,7 @@ DbResponse receivedQuery(OperationContext* opCtx, dbResponse.shouldRunAgainForExhaust = runQuery(opCtx, q, nss, dbResponse.response); } catch (const AssertionException& e) { - behaviors.handleException(e, opCtx); + behaviors.handleException(e.toStatus(), opCtx); dbResponse.response.reset(); generateLegacyQueryErrorResponse(e, q, &op, &dbResponse.response); diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h index 7c9dff81636..f0d7fd403a5 100644 --- a/src/mongo/db/service_entry_point_common.h +++ b/src/mongo/db/service_entry_point_common.h @@ -84,7 +84,7 @@ struct ServiceEntryPointCommon { virtual void attachCurOpErrInfo(OperationContext* opCtx, const BSONObj& replyObj) const = 0; - virtual void handleException(const DBException& e, OperationContext* opCtx) const = 0; + virtual void handleException(const Status& status, OperationContext* opCtx) const = 0; virtual void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const = 0; diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index 1e037cd3717..4347ffa8ac4 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -182,9 +182,9 @@ public: CurOp::get(opCtx)->debug().errInfo = getStatusFromCommandResult(replyObj); } - void handleException(const DBException& e, OperationContext* opCtx) const override { + void handleException(const Status& status, OperationContext* opCtx) const override { // If we got a stale config, wait in case the operation is stuck in a critical section - if (auto sce = e.extraInfo()) { + if (auto sce = status.extraInfo()) { // A config server acting as a router may return a StaleConfig exception, but a config // server won't contain data for a sharded collection, so skip handling the exception. if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { @@ -204,7 +204,7 @@ public: onShardVersionMismatchNoExcept(opCtx, sce->getNss(), sce->getVersionReceived()) .ignore(); } - } else if (auto sce = e.extraInfo()) { + } else if (auto sce = status.extraInfo()) { if (!opCtx->getClient()->isInDirectClient()) { onDbVersionMismatchNoExcept( opCtx, sce->getDb(), sce->getVersionReceived(), sce->getVersionWanted()) diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp index a2ffc881b4b..158b55a10e6 100644 --- a/src/mongo/embedded/service_entry_point_embedded.cpp +++ b/src/mongo/embedded/service_entry_point_embedded.cpp @@ -93,7 +93,7 @@ public: void attachCurOpErrInfo(OperationContext*, const BSONObj&) const override {} - void handleException(const DBException& e, OperationContext* opCtx) const override {} + void handleException(const Status& status, OperationContext* opCtx) const override {} void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const override {} -- cgit v1.2.1