summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-09-26 01:43:36 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-09-26 01:43:36 +0000
commit38f350478add12941539ec6f44034d5ba4443265 (patch)
tree5261c131d23ea05f45c6828022ae8f8a9fe81b9d
parent4c283d5c34ba9a9ba2ede5fa066dcdd9ab337651 (diff)
downloadmongo-38f350478add12941539ec6f44034d5ba4443265.tar.gz
SERVER-49107 Futurize and refactor execCommandDatabase()
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/db/service_entry_point_common.cpp719
-rw-r--r--src/mongo/db/service_entry_point_common.h2
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp6
-rw-r--r--src/mongo/embedded/service_entry_point_embedded.cpp2
5 files changed, 388 insertions, 343 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index f788660569e..5d38573d087 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -401,6 +401,8 @@ error_codes:
- {code: 327, name: NoSuchTenantMigration}
+ - {code: 328, name: SkipCommandExecution}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 085595c229b..d586c76c845 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -150,6 +150,10 @@ struct HandleRequest {
return *getOpCtx()->getClient();
}
+ auto session() const {
+ return client().session();
+ }
+
NetworkOp op() const {
return getMessage().operation();
}
@@ -983,390 +987,437 @@ bool runCommandImpl(OperationContext* opCtx,
return ok;
}
-/**
- * Executes a command after stripping metadata, performing authorization checks,
- * handling audit impersonation, and (potentially) setting maintenance mode. This method
- * also checks that the command is permissible to run on the node given its current
- * replication state. All the logic here is independent of any particular command; any
- * functionality relevant to a specific command should be confined to its run() method.
- */
-void execCommandDatabase(OperationContext* opCtx,
- Command* command,
- const OpMsgRequest& request,
- rpc::ReplyBuilderInterface* replyBuilder,
- const ServiceEntryPointCommon::Hooks& behaviors) {
- CommandHelpers::uassertShouldAttemptParse(opCtx, command, request);
- BSONObjBuilder extraFieldsBuilder;
- auto startOperationTime = getClientOperationTime(opCtx);
+class ExecCommandDatabase final {
+public:
+ explicit ExecCommandDatabase(std::shared_ptr<HandleRequest::ExecutionContext> execContext)
+ : _isInternalClient(execContext->session() &&
+ execContext->session()->getTags() &
+ transport::Session::kInternalClient),
+ _execContext(std::move(execContext)) {}
+
+ /**
+ * Returns a future that executes a command after stripping metadata, performing authorization
+ * checks, handling audit impersonation, and (potentially) setting maintenance mode. The future
+ * also checks that the command is permissible to run on the node given its current replication
+ * state. All the logic here is independent of any particular command; any functionality
+ * relevant to a specific command should be confined to its run() method.
+ */
+ static Future<void> run(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
+ auto opCtx = execContext->getOpCtx();
+ auto command = execContext->getCommand();
+ auto& request = execContext->getRequest();
+
+ auto instance = std::make_shared<ExecCommandDatabase>(std::move(execContext));
+
+ CommandHelpers::uassertShouldAttemptParse(opCtx, command, request);
+ instance->_startOperationTime = getClientOperationTime(opCtx);
+
+ instance->_invocation = command->parse(opCtx, request);
+ CommandInvocation::set(opCtx, instance->_invocation);
+
+ return instance->_initiateCommand()
+ .then([instance] { return instance->_commandExec(); })
+ .onError([instance = std::move(instance)](Status status) mutable {
+ return instance->_handleFailure(std::move(status));
+ });
+ }
- std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request);
- CommandInvocation::set(opCtx, invocation);
+private:
+ // Any logic, such as authorization and auditing, that must precede execution of the command.
+ Future<void> _initiateCommand();
- OperationSessionInfoFromClient sessionOptions;
+ // Returns the future chain that executes the parsed command against the database.
+ Future<void> _commandExec();
- const auto isInternalClient = opCtx->getClient()->session() &&
- (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient);
+ // Any error-handling logic that must be performed if the command initiation/execution fails.
+ void _handleFailure(Status status);
- try {
- const auto apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command);
- Client* client = opCtx->getClient();
+ // An indication on whether the command is initiated by an internal client.
+ const bool _isInternalClient;
- {
- stdx::lock_guard<Client> lk(*client);
- CurOp::get(opCtx)->setCommand_inlock(command);
- APIParameters::get(opCtx) = APIParameters::fromClient(apiParamsFromClient);
- }
+ std::shared_ptr<HandleRequest::ExecutionContext> const _execContext;
- auto& apiParams = APIParameters::get(opCtx);
- auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext());
- const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata();
- if (clientMetadata) {
- auto appName = clientMetadata.get().getApplicationName().toString();
- apiVersionMetrics.update(appName, apiParams);
- }
+ // The following allows `_initiateCommand`, `_commandExec`, and `_handleFailure` to share
+ // execution state without concerning the lifetime of these variables. In particular, `_scoped`
+ // is a scoped variable that once created, lives as long as its owner (i.e., an instance of
+ // `ExecCommandDatabase`) lives.
+ BSONObjBuilder _extraFieldsBuilder;
+ std::shared_ptr<CommandInvocation> _invocation;
+ LogicalTime _startOperationTime;
+ OperationSessionInfoFromClient _sessionOptions;
+ std::unique_ptr<PolymorphicScoped> _scoped;
+};
- sleepMillisAfterCommandExecutionBegins.execute([&](const BSONObj& data) {
- auto numMillis = data["millis"].numberInt();
- auto commands = data["commands"].Obj().getFieldNames<std::set<std::string>>();
- // Only sleep for one of the specified commands.
- if (commands.find(command->getName()) != commands.end()) {
- mongo::sleepmillis(numMillis);
- }
- });
+Future<void> ExecCommandDatabase::_initiateCommand() try {
+ auto opCtx = _execContext->getOpCtx();
+ auto& request = _execContext->getRequest();
+ auto command = _execContext->getCommand();
+ auto replyBuilder = _execContext->getReplyBuilder();
+
+ const auto apiParamsFromClient = initializeAPIParameters(opCtx, request.body, command);
+ Client* client = opCtx->getClient();
- rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
- rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName());
+ {
+ stdx::lock_guard<Client> lk(*client);
+ CurOp::get(opCtx)->setCommand_inlock(command);
+ APIParameters::get(opCtx) = APIParameters::fromClient(apiParamsFromClient);
+ }
- auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
+ auto& apiParams = APIParameters::get(opCtx);
+ auto& apiVersionMetrics = APIVersionMetrics::get(opCtx->getServiceContext());
+ const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata();
+ if (clientMetadata) {
+ auto appName = clientMetadata.get().getApplicationName().toString();
+ apiVersionMetrics.update(appName, apiParams);
+ }
- sessionOptions = initializeOperationSessionInfo(
- opCtx,
- request.body,
- command->requiresAuth(),
- command->attachLogicalSessionsToOpCtx(),
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet);
+ sleepMillisAfterCommandExecutionBegins.execute([&](const BSONObj& data) {
+ auto numMillis = data["millis"].numberInt();
+ auto commands = data["commands"].Obj().getFieldNames<std::set<std::string>>();
+ // Only sleep for one of the specified commands.
+ if (commands.find(command->getName()) != commands.end()) {
+ mongo::sleepmillis(numMillis);
+ }
+ });
- CommandHelpers::evaluateFailCommandFailPoint(opCtx, invocation.get());
+ rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
+ rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName());
- const auto dbname = request.getDatabase().toString();
- uassert(
- ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid database name: '" << dbname << "'",
+ auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
+
+ _sessionOptions = initializeOperationSessionInfo(opCtx,
+ request.body,
+ command->requiresAuth(),
+ command->attachLogicalSessionsToOpCtx(),
+ replCoord->getReplicationMode() ==
+ repl::ReplicationCoordinator::modeReplSet);
+
+ CommandHelpers::evaluateFailCommandFailPoint(opCtx, _invocation.get());
+
+ const auto dbname = request.getDatabase().toString();
+ uassert(ErrorCodes::InvalidNamespace,
+ fmt::format("Invalid database name: '{}'", dbname),
NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
- const auto allowTransactionsOnConfigDatabase =
- (serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
- serverGlobalParams.clusterRole == ClusterRole::ShardServer);
-
- validateSessionOptions(sessionOptions,
- command->getName(),
- invocation->ns(),
- allowTransactionsOnConfigDatabase);
-
- std::unique_ptr<MaintenanceModeSetter> mmSetter;
-
- BSONElement cmdOptionMaxTimeMSField;
- BSONElement maxTimeMSOpOnlyField;
- BSONElement allowImplicitCollectionCreationField;
- BSONElement helpField;
-
- StringMap<int> topLevelFields;
- for (auto&& element : request.body) {
- StringData fieldName = element.fieldNameStringData();
- if (fieldName == QueryRequest::cmdOptionMaxTimeMS) {
- cmdOptionMaxTimeMSField = element;
- } else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) {
- uassert(ErrorCodes::InvalidOptions,
- "Can not specify maxTimeMSOpOnly for non internal clients",
- isInternalClient);
- maxTimeMSOpOnlyField = element;
- } else if (fieldName == "allowImplicitCollectionCreation") {
- allowImplicitCollectionCreationField = element;
- } else if (fieldName == CommandHelpers::kHelpFieldName) {
- helpField = element;
- } else if (fieldName == "comment") {
- opCtx->setComment(element.wrap());
- } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) {
- uasserted(ErrorCodes::InvalidOptions,
- "no such command option $maxTimeMs; use maxTimeMS instead");
- }
+ const auto allowTransactionsOnConfigDatabase =
+ (serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
+ serverGlobalParams.clusterRole == ClusterRole::ShardServer);
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Parsed command object contains duplicate top level key: "
- << fieldName,
- topLevelFields[fieldName]++ == 0);
- }
+ validateSessionOptions(
+ _sessionOptions, command->getName(), _invocation->ns(), allowTransactionsOnConfigDatabase);
- if (CommandHelpers::isHelpRequest(helpField)) {
- CurOp::get(opCtx)->ensureStarted();
- // We disable last-error for help requests due to SERVER-11492, because config
- // servers use help requests to determine which commands are database writes, and so
- // must be forwarded to all config servers.
- LastError::get(opCtx->getClient()).disable();
- Command::generateHelpResponse(opCtx, replyBuilder, *command);
- return;
+ std::unique_ptr<MaintenanceModeSetter> mmSetter;
+
+ BSONElement cmdOptionMaxTimeMSField;
+ BSONElement maxTimeMSOpOnlyField;
+ BSONElement allowImplicitCollectionCreationField;
+ BSONElement helpField;
+
+ StringMap<int> topLevelFields;
+ for (auto&& element : request.body) {
+ StringData fieldName = element.fieldNameStringData();
+ if (fieldName == QueryRequest::cmdOptionMaxTimeMS) {
+ cmdOptionMaxTimeMSField = element;
+ } else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) {
+ uassert(ErrorCodes::InvalidOptions,
+ "Can not specify maxTimeMSOpOnly for non internal clients",
+ _isInternalClient);
+ maxTimeMSOpOnlyField = element;
+ } else if (fieldName == "allowImplicitCollectionCreation") {
+ allowImplicitCollectionCreationField = element;
+ } else if (fieldName == CommandHelpers::kHelpFieldName) {
+ helpField = element;
+ } else if (fieldName == "comment") {
+ opCtx->setComment(element.wrap());
+ } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) {
+ uasserted(ErrorCodes::InvalidOptions,
+ "no such command option $maxTimeMs; use maxTimeMS instead");
}
- ImpersonationSessionGuard guard(opCtx);
- invocation->checkAuthorization(opCtx, request);
-
- const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname);
-
- if (!opCtx->getClient()->isInDirectClient() &&
- !MONGO_unlikely(skipCheckingForNotMasterInCommandDispatch.shouldFail())) {
- const bool inMultiDocumentTransaction = (sessionOptions.getAutocommit() == false);
- auto allowed = command->secondaryAllowed(opCtx->getServiceContext());
- bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways;
- bool couldHaveOptedIn =
- allowed == Command::AllowedOnSecondary::kOptIn && !inMultiDocumentTransaction;
- bool optedIn =
- couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary();
- bool canRunHere = commandCanRunHere(opCtx, dbname, command, inMultiDocumentTransaction);
- if (!canRunHere && couldHaveOptedIn) {
- uasserted(ErrorCodes::NotPrimaryNoSecondaryOk, "not master and slaveOk=false");
- }
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Parsed command object contains duplicate top level key: "
+ << fieldName,
+ topLevelFields[fieldName]++ == 0);
+ }
- if (MONGO_unlikely(respondWithNotPrimaryInCommandDispatch.shouldFail())) {
- uassert(ErrorCodes::NotWritablePrimary, "not primary", canRunHere);
- } else {
- uassert(ErrorCodes::NotWritablePrimary, "not master", canRunHere);
- }
+ if (CommandHelpers::isHelpRequest(helpField)) {
+ CurOp::get(opCtx)->ensureStarted();
+ // We disable last-error for help requests due to SERVER-11492, because config servers use
+ // help requests to determine which commands are database writes, and so must be forwarded
+ // to all config servers.
+ LastError::get(opCtx->getClient()).disable();
+ Command::generateHelpResponse(opCtx, replyBuilder, *command);
+ return Status(ErrorCodes::SkipCommandExecution,
+ "Skipping command execution for help request");
+ }
- if (!command->maintenanceOk() &&
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
- !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) &&
- !replCoord->getMemberState().secondary()) {
-
- uassert(ErrorCodes::NotPrimaryOrSecondary,
- "node is recovering",
- !replCoord->getMemberState().recovering());
- uassert(ErrorCodes::NotPrimaryOrSecondary,
- "node is not in primary or recovering state",
- replCoord->getMemberState().primary());
- // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode
- uassert(ErrorCodes::NotPrimaryOrSecondary,
- "node is in drain mode",
- optedIn || alwaysAllowed);
- }
+ ImpersonationSessionGuard guard(opCtx);
+ _invocation->checkAuthorization(opCtx, request);
+
+ const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname);
+
+ if (!opCtx->getClient()->isInDirectClient() &&
+ !MONGO_unlikely(skipCheckingForNotMasterInCommandDispatch.shouldFail())) {
+ const bool inMultiDocumentTransaction = (_sessionOptions.getAutocommit() == false);
+ auto allowed = command->secondaryAllowed(opCtx->getServiceContext());
+ bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways;
+ bool couldHaveOptedIn =
+ allowed == Command::AllowedOnSecondary::kOptIn && !inMultiDocumentTransaction;
+ bool optedIn = couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary();
+ bool canRunHere = commandCanRunHere(opCtx, dbname, command, inMultiDocumentTransaction);
+ if (!canRunHere && couldHaveOptedIn) {
+ uasserted(ErrorCodes::NotPrimaryNoSecondaryOk, "not master and slaveOk=false");
}
- if (command->adminOnly()) {
- LOGV2_DEBUG(21961,
- 2,
- "Admin only command: {command}",
- "Admin only command",
- "command"_attr = request.getCommandName());
+ if (MONGO_unlikely(respondWithNotPrimaryInCommandDispatch.shouldFail())) {
+ uassert(ErrorCodes::NotWritablePrimary, "not primary", canRunHere);
+ } else {
+ uassert(ErrorCodes::NotWritablePrimary, "not master", canRunHere);
}
- if (command->maintenanceMode()) {
- mmSetter.reset(new MaintenanceModeSetter(opCtx));
+ if (!command->maintenanceOk() &&
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
+ !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) &&
+ !replCoord->getMemberState().secondary()) {
+
+ uassert(ErrorCodes::NotPrimaryOrSecondary,
+ "node is recovering",
+ !replCoord->getMemberState().recovering());
+ uassert(ErrorCodes::NotPrimaryOrSecondary,
+ "node is not in primary or recovering state",
+ replCoord->getMemberState().primary());
+ // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode
+ uassert(ErrorCodes::NotPrimaryOrSecondary,
+ "node is in drain mode",
+ optedIn || alwaysAllowed);
}
+ }
- if (command->shouldAffectCommandCounter()) {
- OpCounters* opCounters = &globalOpCounters;
- opCounters->gotCommand();
- }
+ if (command->adminOnly()) {
+ LOGV2_DEBUG(21961,
+ 2,
+ "Admin only command: {command}",
+ "Admin only command",
+ "command"_attr = request.getCommandName());
+ }
- // Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation
- // on the OperationContext. The 'maxTimeMS' option unfortunately has a different meaning
- // for a getMore command, where it is used to communicate the maximum time to wait for
- // new inserts on tailable cursors, not as a deadline for the operation.
- // TODO SERVER-34277 Remove the special handling for maxTimeMS for getMores. This will
- // require introducing a new 'max await time' parameter for getMore, and eventually
- // banning maxTimeMS altogether on a getMore command.
- int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField));
- int maxTimeMSOpOnly = uassertStatusOK(QueryRequest::parseMaxTimeMS(maxTimeMSOpOnlyField));
-
- // The "hello" command should not inherit the deadline from the user op it is operating as a
- // part of as that can interfere with replica set monitoring and host selection.
- bool ignoreMaxTimeMSOpOnly = command->getName() == "hello"_sd;
-
- if ((maxTimeMS > 0 || maxTimeMSOpOnly > 0) &&
- command->getLogicalOp() != LogicalOp::opGetMore) {
- uassert(40119,
- "Illegal attempt to set operation deadline within DBDirectClient",
- !opCtx->getClient()->isInDirectClient());
- if (!ignoreMaxTimeMSOpOnly && maxTimeMSOpOnly > 0 &&
- (maxTimeMS == 0 || maxTimeMSOpOnly < maxTimeMS)) {
- opCtx->storeMaxTimeMS(Milliseconds{maxTimeMS});
- opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMSOpOnly},
- ErrorCodes::MaxTimeMSExpired);
- } else if (maxTimeMS > 0) {
- opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired);
- }
+ if (command->maintenanceMode()) {
+ mmSetter.reset(new MaintenanceModeSetter(opCtx));
+ }
+
+ if (command->shouldAffectCommandCounter()) {
+ OpCounters* opCounters = &globalOpCounters;
+ opCounters->gotCommand();
+ }
+
+ // Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation on the
+ // OperationContext. The 'maxTimeMS' option unfortunately has a different meaning for a getMore
+ // command, where it is used to communicate the maximum time to wait for new inserts on tailable
+ // cursors, not as a deadline for the operation.
+ // TODO SERVER-34277 Remove the special handling for maxTimeMS for getMores. This will require
+ // introducing a new 'max await time' parameter for getMore, and eventually banning maxTimeMS
+ // altogether on a getMore command.
+ int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField));
+ int maxTimeMSOpOnly = uassertStatusOK(QueryRequest::parseMaxTimeMS(maxTimeMSOpOnlyField));
+
+ // The "hello" command should not inherit the deadline from the user op it is operating as a
+ // part of as that can interfere with replica set monitoring and host selection.
+ bool ignoreMaxTimeMSOpOnly = command->getName() == "hello"_sd;
+
+ if ((maxTimeMS > 0 || maxTimeMSOpOnly > 0) && command->getLogicalOp() != LogicalOp::opGetMore) {
+ uassert(40119,
+ "Illegal attempt to set operation deadline within DBDirectClient",
+ !opCtx->getClient()->isInDirectClient());
+ if (!ignoreMaxTimeMSOpOnly && maxTimeMSOpOnly > 0 &&
+ (maxTimeMS == 0 || maxTimeMSOpOnly < maxTimeMS)) {
+ opCtx->storeMaxTimeMS(Milliseconds{maxTimeMS});
+ opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMSOpOnly},
+ ErrorCodes::MaxTimeMSExpired);
+ } else if (maxTimeMS > 0) {
+ opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired);
}
+ }
- auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- // If the parent operation runs in a transaction, we don't override the read concern.
- auto skipReadConcern =
- opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction();
- bool startTransaction = static_cast<bool>(sessionOptions.getStartTransaction());
- if (!skipReadConcern) {
- auto newReadConcernArgs = uassertStatusOK(_extractReadConcern(
- opCtx, invocation.get(), request.body, startTransaction, isInternalClient));
+ // If the parent operation runs in a transaction, we don't override the read concern.
+ auto skipReadConcern =
+ opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction();
+ bool startTransaction = static_cast<bool>(_sessionOptions.getStartTransaction());
+ if (!skipReadConcern) {
+ auto newReadConcernArgs = uassertStatusOK(_extractReadConcern(
+ opCtx, _invocation.get(), request.body, startTransaction, _isInternalClient));
- // Ensure that the RC being set on the opCtx has provenance.
- invariant(newReadConcernArgs.getProvenance().hasSource(),
- str::stream() << "unexpected unset provenance on readConcern: "
- << newReadConcernArgs.toBSONInner());
+ // Ensure that the RC being set on the opCtx has provenance.
+ invariant(newReadConcernArgs.getProvenance().hasSource(),
+ str::stream() << "unexpected unset provenance on readConcern: "
+ << newReadConcernArgs.toBSONInner());
- uassert(ErrorCodes::InvalidOptions,
- "Only the first command in a transaction may specify a readConcern",
- startTransaction || !opCtx->inMultiDocumentTransaction() ||
- newReadConcernArgs.isEmpty());
-
- {
- // We must obtain the client lock to set the ReadConcernArgs on the operation
- // context as it may be concurrently read by CurrentOp.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- readConcernArgs = std::move(newReadConcernArgs);
- }
- }
+ uassert(ErrorCodes::InvalidOptions,
+ "Only the first command in a transaction may specify a readConcern",
+ startTransaction || !opCtx->inMultiDocumentTransaction() ||
+ newReadConcernArgs.isEmpty());
- if (startTransaction) {
- opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true);
- opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+ {
+ // We must obtain the client lock to set the ReadConcernArgs on the operation context as
+ // it may be concurrently read by CurrentOp.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ readConcernArgs = std::move(newReadConcernArgs);
}
+ }
- if (opCtx->inMultiDocumentTransaction() && !startTransaction) {
- uassert(4937700,
- "API parameters are only allowed in the first command of a multi-document "
- "transaction",
- !APIParameters::get(opCtx).getParamsPassed());
- }
+ if (startTransaction) {
+ opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true);
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+ }
- // Remember whether or not this operation is starting a transaction, in case something
- // later in the execution needs to adjust its behavior based on this.
- opCtx->setIsStartingMultiDocumentTransaction(startTransaction);
+ if (opCtx->inMultiDocumentTransaction() && !startTransaction) {
+ uassert(4937700,
+ "API parameters are only allowed in the first command of a multi-document "
+ "transaction",
+ !APIParameters::get(opCtx).getParamsPassed());
+ }
- auto& oss = OperationShardingState::get(opCtx);
+ // Remember whether or not this operation is starting a transaction, in case something later in
+ // the execution needs to adjust its behavior based on this.
+ opCtx->setIsStartingMultiDocumentTransaction(startTransaction);
- if (!opCtx->getClient()->isInDirectClient() &&
- readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern &&
- (iAmPrimary ||
- (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) {
- oss.initializeClientRoutingVersionsFromCommand(invocation->ns(), request.body);
+ auto& oss = OperationShardingState::get(opCtx);
- auto const shardingState = ShardingState::get(opCtx);
- if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) {
- uassertStatusOK(shardingState->canAcceptShardedCommands());
- }
+ if (!opCtx->getClient()->isInDirectClient() &&
+ readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern &&
+ (iAmPrimary || (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) {
+ oss.initializeClientRoutingVersionsFromCommand(_invocation->ns(), request.body);
- behaviors.advanceConfigOpTimeFromRequestMetadata(opCtx);
+ auto const shardingState = ShardingState::get(opCtx);
+ if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) {
+ uassertStatusOK(shardingState->canAcceptShardedCommands());
}
- oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField);
- auto scoped = behaviors.scopedOperationCompletionShardingActions(opCtx);
+ _execContext->behaviors->advanceConfigOpTimeFromRequestMetadata(opCtx);
+ }
- // This may trigger the maxTimeAlwaysTimeOut failpoint.
- auto status = opCtx->checkForInterruptNoAssert();
+ oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField);
+ _scoped = _execContext->behaviors->scopedOperationCompletionShardingActions(opCtx);
- // We still proceed if the primary stepped down, but accept other kinds of
- // interruptions. We defer to individual commands to allow themselves to be
- // interruptible by stepdowns, since commands like 'voteRequest' should conversely
- // continue executing.
- if (status != ErrorCodes::PrimarySteppedDown &&
- status != ErrorCodes::InterruptedDueToReplStateChange) {
- uassertStatusOK(status);
- }
+ // This may trigger the maxTimeAlwaysTimeOut failpoint.
+ auto status = opCtx->checkForInterruptNoAssert();
+ // We still proceed if the primary stepped down, but accept other kinds of interruptions. We
+ // defer to individual commands to allow themselves to be interruptible by stepdowns, since
+ // commands like 'voteRequest' should conversely continue executing.
+ if (status != ErrorCodes::PrimarySteppedDown &&
+ status != ErrorCodes::InterruptedDueToReplStateChange) {
+ uassertStatusOK(status);
+ }
- CurOp::get(opCtx)->ensureStarted();
+ CurOp::get(opCtx)->ensureStarted();
- command->incrementCommandsExecuted();
-
- if (shouldLog(logv2::LogComponent::kTracking, logv2::LogSeverity::Debug(1)) &&
- rpc::TrackingMetadata::get(opCtx).getParentOperId()) {
- LOGV2_DEBUG_OPTIONS(4615605,
- 1,
- {logv2::LogComponent::kTracking},
- "Command metadata: {trackingMetadata}",
- "Command metadata",
- "trackingMetadata"_attr = rpc::TrackingMetadata::get(opCtx));
- rpc::TrackingMetadata::get(opCtx).setIsLogged(true);
- }
+ command->incrementCommandsExecuted();
- behaviors.waitForReadConcern(opCtx, invocation.get(), request);
- behaviors.setPrepareConflictBehaviorForReadConcern(opCtx, invocation.get());
+ if (shouldLog(logv2::LogComponent::kTracking, logv2::LogSeverity::Debug(1)) &&
+ rpc::TrackingMetadata::get(opCtx).getParentOperId()) {
+ LOGV2_DEBUG_OPTIONS(4615605,
+ 1,
+ {logv2::LogComponent::kTracking},
+ "Command metadata: {trackingMetadata}",
+ "Command metadata",
+ "trackingMetadata"_attr = rpc::TrackingMetadata::get(opCtx));
+ rpc::TrackingMetadata::get(opCtx).setIsLogged(true);
+ }
- try {
- if (!runCommandImpl(opCtx,
- invocation.get(),
- request,
- replyBuilder,
- startOperationTime,
- behaviors,
- &extraFieldsBuilder,
- sessionOptions)) {
- command->incrementCommandsFailed();
- }
- } catch (const DBException& e) {
- command->incrementCommandsFailed();
- if (e.code() == ErrorCodes::Unauthorized) {
- CommandHelpers::auditLogAuthEvent(opCtx, invocation.get(), request, e.code());
- }
- throw;
- }
- } catch (const DBException& e) {
- behaviors.handleException(e, opCtx);
+ _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request);
+ _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get());
+ return Status::OK();
+} catch (const DBException& ex) {
+ return ex.toStatus();
+}
- // Append the error labels for transient transaction errors.
- auto response = extraFieldsBuilder.asTempObj();
- boost::optional<ErrorCodes::Error> wcCode;
- if (response.hasField("writeConcernError")) {
- wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt());
- }
- appendErrorLabelsAndTopologyVersion(opCtx,
- &extraFieldsBuilder,
- sessionOptions,
- command->getName(),
- e.code(),
- wcCode,
- isInternalClient);
-
- BSONObjBuilder metadataBob;
- behaviors.appendReplyMetadata(opCtx, request, &metadataBob);
-
- // The read concern may not have yet been placed on the operation context, so attempt to
- // parse it here, so if it is valid it can be used to compute the proper operationTime.
- auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- if (readConcernArgs.isEmpty()) {
- auto readConcernArgsStatus = _extractReadConcern(opCtx,
- invocation.get(),
- request.body,
- false /*startTransaction*/,
- isInternalClient);
- if (readConcernArgsStatus.isOK()) {
- // We must obtain the client lock to set the ReadConcernArgs on the operation
- // context as it may be concurrently read by CurrentOp.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- readConcernArgs = readConcernArgsStatus.getValue();
- }
- }
- appendClusterAndOperationTime(opCtx, &extraFieldsBuilder, &metadataBob, startOperationTime);
+Future<void> ExecCommandDatabase::_commandExec() try {
+ if (!runCommandImpl(_execContext->getOpCtx(),
+ _invocation.get(),
+ _execContext->getRequest(),
+ _execContext->getReplyBuilder(),
+ _startOperationTime,
+ *_execContext->behaviors,
+ &_extraFieldsBuilder,
+ _sessionOptions)) {
+ _execContext->getCommand()->incrementCommandsFailed();
+ }
+ return Status::OK();
+} catch (const DBException& ex) {
+ _execContext->getCommand()->incrementCommandsFailed();
+ if (ex.code() == ErrorCodes::Unauthorized) {
+ CommandHelpers::auditLogAuthEvent(
+ _execContext->getOpCtx(), _invocation.get(), _execContext->getRequest(), ex.code());
+ }
+ return ex.toStatus();
+}
- LOGV2_DEBUG(21962,
- 1,
- "Assertion while executing command '{command}' on database '{db}' with "
- "arguments '{commandArgs}': {error}",
- "Assertion while executing command",
- "command"_attr = request.getCommandName(),
- "db"_attr = request.getDatabase(),
- "commandArgs"_attr = redact(
- ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)),
- "error"_attr = redact(e.toString()));
+void ExecCommandDatabase::_handleFailure(Status status) {
+ // Absorb the exception as the command execution has already been skipped.
+ if (status.code() == ErrorCodes::SkipCommandExecution)
+ return;
- generateErrorResponse(
- opCtx, replyBuilder, e.toStatus(), metadataBob.obj(), extraFieldsBuilder.obj());
+ auto opCtx = _execContext->getOpCtx();
+ auto& request = _execContext->getRequest();
+ auto command = _execContext->getCommand();
+ auto replyBuilder = _execContext->getReplyBuilder();
+ const auto& behaviors = *_execContext->behaviors;
- if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(e.code())) {
- // Rethrow the exception to the top to signal that the client connection should be
- // closed.
- throw;
+ behaviors.handleException(status, opCtx);
+
+ // Append the error labels for transient transaction errors.
+ auto response = _extraFieldsBuilder.asTempObj();
+ boost::optional<ErrorCodes::Error> wcCode;
+ if (response.hasField("writeConcernError")) {
+ wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt());
+ }
+ appendErrorLabelsAndTopologyVersion(opCtx,
+ &_extraFieldsBuilder,
+ _sessionOptions,
+ command->getName(),
+ status.code(),
+ wcCode,
+ _isInternalClient);
+
+ BSONObjBuilder metadataBob;
+ behaviors.appendReplyMetadata(opCtx, request, &metadataBob);
+
+ // The read concern may not have yet been placed on the operation context, so attempt to parse
+ // it here, so if it is valid it can be used to compute the proper operationTime.
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ if (readConcernArgs.isEmpty()) {
+ auto readConcernArgsStatus = _extractReadConcern(
+ opCtx, _invocation.get(), request.body, false /*startTransaction*/, _isInternalClient);
+ if (readConcernArgsStatus.isOK()) {
+ // We must obtain the client lock to set the ReadConcernArgs on the operation context as
+ // it may be concurrently read by CurrentOp.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ readConcernArgs = readConcernArgsStatus.getValue();
}
}
+ appendClusterAndOperationTime(opCtx, &_extraFieldsBuilder, &metadataBob, _startOperationTime);
+
+ LOGV2_DEBUG(21962,
+ 1,
+ "Assertion while executing command '{command}' on database '{db}' with "
+ "arguments '{commandArgs}': {error}",
+ "Assertion while executing command",
+ "command"_attr = request.getCommandName(),
+ "db"_attr = request.getDatabase(),
+ "commandArgs"_attr = redact(
+ ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)),
+ "error"_attr = redact(status.toString()));
+
+ generateErrorResponse(
+ opCtx, replyBuilder, status, metadataBob.obj(), _extraFieldsBuilder.obj());
+
+ if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(status.code())) {
+ // Rethrow the exception to the top to signal that the client connection should be closed.
+ internalAssert(status);
+ }
}
/**
@@ -1386,7 +1437,6 @@ void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) {
curop->setNS_inlock(nss.ns());
}
-
Future<void> parseCommand(std::shared_ptr<HandleRequest::ExecutionContext> execContext) try {
execContext->setRequest(rpc::opMsgRequestFromAnyProtocol(execContext->getMessage()));
return Status::OK();
@@ -1467,14 +1517,7 @@ Future<void> executeCommand(std::shared_ptr<HandleRequest::ExecutionContext> exe
return Status::OK();
})
- .then([execContext]() -> Future<void> {
- execCommandDatabase(execContext->getOpCtx(),
- execContext->getCommand(),
- execContext->getRequest(),
- execContext->getReplyBuilder(),
- *execContext->behaviors);
- return Status::OK();
- })
+ .then([execContext] { return ExecCommandDatabase::run(std::move(execContext)); })
.tapError([execContext](Status status) {
LOGV2_DEBUG(
21966,
@@ -1586,7 +1629,7 @@ DbResponse receivedQuery(OperationContext* opCtx,
dbResponse.shouldRunAgainForExhaust = runQuery(opCtx, q, nss, dbResponse.response);
} catch (const AssertionException& e) {
- behaviors.handleException(e, opCtx);
+ behaviors.handleException(e.toStatus(), opCtx);
dbResponse.response.reset();
generateLegacyQueryErrorResponse(e, q, &op, &dbResponse.response);
diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h
index 7c9dff81636..f0d7fd403a5 100644
--- a/src/mongo/db/service_entry_point_common.h
+++ b/src/mongo/db/service_entry_point_common.h
@@ -84,7 +84,7 @@ struct ServiceEntryPointCommon {
virtual void attachCurOpErrInfo(OperationContext* opCtx, const BSONObj& replyObj) const = 0;
- virtual void handleException(const DBException& e, OperationContext* opCtx) const = 0;
+ virtual void handleException(const Status& status, OperationContext* opCtx) const = 0;
virtual void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const = 0;
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 1e037cd3717..4347ffa8ac4 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -182,9 +182,9 @@ public:
CurOp::get(opCtx)->debug().errInfo = getStatusFromCommandResult(replyObj);
}
- void handleException(const DBException& e, OperationContext* opCtx) const override {
+ void handleException(const Status& status, OperationContext* opCtx) const override {
// If we got a stale config, wait in case the operation is stuck in a critical section
- if (auto sce = e.extraInfo<StaleConfigInfo>()) {
+ if (auto sce = status.extraInfo<StaleConfigInfo>()) {
// A config server acting as a router may return a StaleConfig exception, but a config
// server won't contain data for a sharded collection, so skip handling the exception.
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
@@ -204,7 +204,7 @@ public:
onShardVersionMismatchNoExcept(opCtx, sce->getNss(), sce->getVersionReceived())
.ignore();
}
- } else if (auto sce = e.extraInfo<StaleDbRoutingVersion>()) {
+ } else if (auto sce = status.extraInfo<StaleDbRoutingVersion>()) {
if (!opCtx->getClient()->isInDirectClient()) {
onDbVersionMismatchNoExcept(
opCtx, sce->getDb(), sce->getVersionReceived(), sce->getVersionWanted())
diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp
index a2ffc881b4b..158b55a10e6 100644
--- a/src/mongo/embedded/service_entry_point_embedded.cpp
+++ b/src/mongo/embedded/service_entry_point_embedded.cpp
@@ -93,7 +93,7 @@ public:
void attachCurOpErrInfo(OperationContext*, const BSONObj&) const override {}
- void handleException(const DBException& e, OperationContext* opCtx) const override {}
+ void handleException(const Status& status, OperationContext* opCtx) const override {}
void advanceConfigOpTimeFromRequestMetadata(OperationContext* opCtx) const override {}