From 1d6af89487957dfa75835b60f4c09e8c3cdf5cd5 Mon Sep 17 00:00:00 2001 From: Amirsaman Memaripour Date: Wed, 14 Oct 2020 17:17:30 +0000 Subject: SERVER-49107 Futurize and refactor runCommandImpl() --- src/mongo/db/service_entry_point_common.cpp | 627 +++++++++++++++++----------- 1 file changed, 385 insertions(+), 242 deletions(-) diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index d586c76c845..58aec2a488b 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -590,22 +590,184 @@ void _abortUnpreparedOrStashPreparedTransaction( } } -void invokeWithNoSession(OperationContext* opCtx, - const OpMsgRequest& request, - CommandInvocation* invocation, - rpc::ReplyBuilderInterface* replyBuilder) { +class ExecCommandDatabase : public std::enable_shared_from_this { +public: + explicit ExecCommandDatabase(std::shared_ptr execContext) + : _execContext(std::move(execContext)) {} + + static Future run(std::shared_ptr execContext) { + return std::make_shared(std::move(execContext))->_makeFutureChain(); + } + + std::shared_ptr getExecutionContext() { + return _execContext; + } + std::shared_ptr getInvocation() { + return _invocation; + } + const OperationSessionInfoFromClient& getSessionOptions() const { + return _sessionOptions; + } + BSONObjBuilder* getExtraFieldsBuilder() { + return &_extraFieldsBuilder; + } + const LogicalTime& getStartOperationTime() const { + return _startOperationTime; + } + +private: + // Returns a future that executes a command after stripping metadata, performing authorization + // checks, handling audit impersonation, and (potentially) setting maintenance mode. The future + // also checks that the command is permissible to run on the node given its current replication + // state. All the logic here is independent of any particular command; any functionality + // relevant to a specific command should be confined to its run() method. + Future _makeFutureChain() { + return _parseCommand().then([this, anchor = shared_from_this()] { + return _initiateCommand() + .then([this] { return _commandExec(); }) + .onError([this, anchor = shared_from_this()](Status status) { + return _handleFailure(std::move(status)); + }); + }); + } + + Future _parseCommand() { + auto pf = makePromiseFuture(); + auto future = std::move(pf.future).then([this, anchor = shared_from_this()] { + auto opCtx = _execContext->getOpCtx(); + auto command = _execContext->getCommand(); + auto& request = _execContext->getRequest(); + + CommandHelpers::uassertShouldAttemptParse(opCtx, command, request); + _startOperationTime = getClientOperationTime(opCtx); + + _invocation = command->parse(opCtx, request); + CommandInvocation::set(opCtx, _invocation); + }); + pf.promise.emplaceValue(); + return future; + } + + // Any logic, such as authorization and auditing, that must precede execution of the command. + Future _initiateCommand(); + + // Returns the future chain that executes the parsed command against the database. + Future _commandExec(); + + // Any error-handling logic that must be performed if the command initiation/execution fails. + void _handleFailure(Status status); + + bool _isInternalClient() const { + return _execContext->session() && + _execContext->session()->getTags() & transport::Session::kInternalClient; + } + + const std::shared_ptr _execContext; + + // The following allows `_initiateCommand`, `_commandExec`, and `_handleFailure` to share + // execution state without concerning the lifetime of these variables. + BSONObjBuilder _extraFieldsBuilder; + std::shared_ptr _invocation; + LogicalTime _startOperationTime; + OperationSessionInfoFromClient _sessionOptions; + std::unique_ptr _scoped; +}; + +class RunCommandImpl : public std::enable_shared_from_this { +public: + explicit RunCommandImpl(std::shared_ptr ecd) + : _ecd(std::move(ecd)), + _shouldCheckOutSession( + _ecd->getSessionOptions().getTxnNumber() && + !shouldCommandSkipSessionCheckout(_ecd->getInvocation()->definition()->getName())), + _shouldWaitForWriteConcern(_ecd->getInvocation()->supportsWriteConcern() || + _ecd->getInvocation()->definition()->getLogicalOp() == + LogicalOp::opGetMore) {} + + static Future run(std::shared_ptr ecd) { + return std::make_shared(std::move(ecd))->_makeFutureChain(); + } + +private: + Future _makeFutureChain(); + + // Anchor for references to attributes defined in `ExecCommandDatabase` (e.g., sessionOptions). + const std::shared_ptr _ecd; + + // Any code that must run before command execution (e.g., reserving bytes for reply builder). + Future _prologue(); + + // Runs the command without waiting for write concern + Future _runCommand(); + + class RunCommandAndWaitForWriteConcern { + public: + explicit RunCommandAndWaitForWriteConcern(std::shared_ptr rci) + : _rci(std::move(rci)), + _execContext(_rci->_ecd->getExecutionContext()), + _oldWriteConcern(_execContext->getOpCtx()->getWriteConcern()) {} + + ~RunCommandAndWaitForWriteConcern() { + _execContext->getOpCtx()->setWriteConcern(_oldWriteConcern); + } + + static Future run(std::shared_ptr); + + private: + void _waitForWriteConcern(BSONObjBuilder& bb); + + void _setup(); + Future _run(); + Future _onRunCompletion(Status); + + const std::shared_ptr _rci; + const std::shared_ptr _execContext; + + // Allows changing the write concern while running the command and resetting on destruction. + const WriteConcernOptions _oldWriteConcern; + boost::optional _lastOpBeforeRun; + boost::optional _extractedWriteConcern; + }; + + // Any code that must run after command execution -- returns true on successful execution. + Future _epilogue(); + + bool _isInternalClient() const { + auto session = _ecd->getExecutionContext()->session(); + return session && session->getTags() & transport::Session::kInternalClient; + } + + // Whether invoking the command requires a session to be checked out. + const bool _shouldCheckOutSession; + + // getMore operations inherit a WriteConcern from their originating cursor. For example, if the + // originating command was an aggregate with a $out and batchSize: 0. Note that if the command + // only performed reads then we will not need to wait at all. + const bool _shouldWaitForWriteConcern; +}; + +Future invokeWithNoSession(std::shared_ptr ecd) { + auto execContext = ecd->getExecutionContext(); + OperationContext* opCtx = execContext->getOpCtx(); + const OpMsgRequest& request = execContext->getRequest(); + CommandInvocation* invocation = ecd->getInvocation().get(); + rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder(); tenant_migration_donor::migrationConflictHandler( opCtx, request.getDatabase(), [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); }, replyBuilder); + return Status::OK(); } -void invokeWithSessionCheckedOut(OperationContext* opCtx, - const OpMsgRequest& request, - CommandInvocation* invocation, - const OperationSessionInfoFromClient& sessionOptions, - rpc::ReplyBuilderInterface* replyBuilder) { +Future invokeWithSessionCheckedOut(std::shared_ptr ecd) { + auto execContext = ecd->getExecutionContext(); + OperationContext* opCtx = execContext->getOpCtx(); + const OpMsgRequest& request = execContext->getRequest(); + CommandInvocation* invocation = ecd->getInvocation().get(); + const OperationSessionInfoFromClient& sessionOptions = ecd->getSessionOptions(); + rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder(); + // This constructor will check out the session. It handles the appropriate state management // for both multi-statement transactions and retryable writes. Currently, only requests with // a transaction number will check out the session. @@ -742,7 +904,7 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, if (auto okField = replyBuilder->getBodyBuilder().asTempObj()["ok"]) { // If ok is present, use its truthiness. if (!okField.trueValue()) { - return; + return Status::OK(); } } @@ -756,17 +918,13 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, auto bodyBuilder = replyBuilder->getBodyBuilder(); txnResponseMetadata.serialize(&bodyBuilder); } + return Status::OK(); } -bool runCommandImpl(OperationContext* opCtx, - CommandInvocation* invocation, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* replyBuilder, - LogicalTime startOperationTime, - const ServiceEntryPointCommon::Hooks& behaviors, - BSONObjBuilder* extraFieldsBuilder, - const OperationSessionInfoFromClient& sessionOptions) { - const Command* command = invocation->definition(); +Future RunCommandImpl::_prologue() try { + auto execContext = _ecd->getExecutionContext(); + auto opCtx = execContext->getOpCtx(); + const Command* command = _ecd->getInvocation()->definition(); auto bytesToReserve = command->reserveBytesForReply(); // SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the // additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency @@ -775,19 +933,7 @@ bool runCommandImpl(OperationContext* opCtx, if (kDebugBuild) bytesToReserve = 0; #endif - replyBuilder->reserveBytes(bytesToReserve); - - const auto isInternalClient = opCtx->getClient()->session() && - (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient); - - const bool shouldCheckOutSession = - sessionOptions.getTxnNumber() && !shouldCommandSkipSessionCheckout(command->getName()); - - // getMore operations inherit a WriteConcern from their originating cursor. For example, if the - // originating command was an aggregate with a $out and batchSize: 0. Note that if the command - // only performed reads then we will not need to wait at all. - const bool shouldWaitForWriteConcern = - invocation->supportsWriteConcern() || command->getLogicalOp() == LogicalOp::opGetMore; + execContext->getReplyBuilder()->reserveBytes(bytesToReserve); // Record readConcern usages for commands run outside of transactions, excluding DBDirectClient. // For commands inside a transaction, they inherit the readConcern from the transaction. So we @@ -797,139 +943,18 @@ bool runCommandImpl(OperationContext* opCtx, ServerReadConcernMetrics::get(opCtx)->recordReadConcern(repl::ReadConcernArgs::get(opCtx), false /* isTransaction */); } + return Status::OK(); +} catch (const DBException& ex) { + return ex.toStatus(); +} - if (shouldWaitForWriteConcern) { - auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - - // Change the write concern while running the command. - const auto oldWC = opCtx->getWriteConcern(); - ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); - - boost::optional extractedWriteConcern; - if (command->getLogicalOp() == LogicalOp::opGetMore) { - // WriteConcern will be set up during command processing, it must not be specified on - // the command body. - behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); - } else { - // WriteConcern should always be explicitly specified by operations received on shard - // and config servers, even if it is empty (ie. writeConcern: {}). In this context - // (shard/config servers) an empty WC indicates the operation should use the implicit - // server defaults. So, warn if the operation has not specified writeConcern and is on - // a shard/config server. - if (!opCtx->getClient()->isInDirectClient() && - (!opCtx->inMultiDocumentTransaction() || - isTransactionCommand(command->getName()))) { - if (isInternalClient) { - // WriteConcern should always be explicitly specified by operations received - // from internal clients (ie. from a mongos or mongod), even if it is empty - // (ie. writeConcern: {}, which is equivalent to { w: 1, wtimeout: 0 }). - uassert( - 4569201, - "received command without explicit writeConcern on an internalClient connection {}"_format( - redact(request.body.toString())), - request.body.hasField(WriteConcernOptions::kWriteConcernField)); - } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer || - serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - if (!request.body.hasField(WriteConcernOptions::kWriteConcernField)) { - // TODO: Disabled until after SERVER-44539, to avoid log spam. - // LOGV2(21959, "Missing writeConcern on {command}", "Missing " - // "writeConcern on command", "command"_attr = command->getName()); - } - } - } - extractedWriteConcern.emplace( - uassertStatusOK(extractWriteConcern(opCtx, request.body, isInternalClient))); - if (sessionOptions.getAutocommit()) { - validateWriteConcernForTransaction(*extractedWriteConcern, - invocation->definition()->getName()); - } - - // Ensure that the WC being set on the opCtx has provenance. - invariant(extractedWriteConcern->getProvenance().hasSource(), - str::stream() << "unexpected unset provenance on writeConcern: " - << extractedWriteConcern->toBSON()); - - opCtx->setWriteConcern(*extractedWriteConcern); - } - - auto waitForWriteConcern = [&](auto&& bb) { - bool reallyWait = true; - failCommand.executeIf( - [&](const BSONObj& data) { - bb.append(data["writeConcernError"]); - reallyWait = false; - if (data.hasField(kErrorLabelsFieldName) && - data[kErrorLabelsFieldName].type() == Array) { - // Propagate error labels specified in the failCommand failpoint to the - // OperationContext decoration to override getErrorLabels() behaviors. - invariant(!errorLabelsOverride(opCtx)); - errorLabelsOverride(opCtx).emplace( - data.getObjectField(kErrorLabelsFieldName).getOwned()); - } - }, - [&](const BSONObj& data) { - return CommandHelpers::shouldActivateFailCommandFailPoint( - data, invocation, opCtx->getClient()) && - data.hasField("writeConcernError"); - }); - if (reallyWait) { - CurOp::get(opCtx)->debug().writeConcern.emplace(opCtx->getWriteConcern()); - behaviors.waitForWriteConcern(opCtx, invocation, lastOpBeforeRun, bb); - } - }; - - try { - if (auto scoped = failWithErrorCodeInRunCommand.scoped(); - MONGO_unlikely(scoped.isActive())) { - const auto errorCode = scoped.getData()["errorCode"].numberInt(); - LOGV2(21960, - "failWithErrorCodeInRunCommand enabled - failing command with error " - "code: {errorCode}", - "failWithErrorCodeInRunCommand enabled, failing command", - "errorCode"_attr = errorCode); - BSONObjBuilder errorBuilder; - errorBuilder.append("ok", 0.0); - errorBuilder.append("code", errorCode); - errorBuilder.append("errmsg", "failWithErrorCodeInRunCommand enabled."); - replyBuilder->setCommandReply(errorBuilder.obj()); - } else if (shouldCheckOutSession) { - invokeWithSessionCheckedOut( - opCtx, request, invocation, sessionOptions, replyBuilder); - } else { - invokeWithNoSession(opCtx, request, invocation, replyBuilder); - } - } catch (const DBException& ex) { - // Do no-op write before returning NoSuchTransaction if command has writeConcern. - if (ex.toStatus().code() == ErrorCodes::NoSuchTransaction && - !opCtx->getWriteConcern().usedDefault) { - TransactionParticipant::performNoopWrite(opCtx, "NoSuchTransaction error"); - } - waitForWriteConcern(*extraFieldsBuilder); - throw; - } - - waitForWriteConcern(replyBuilder->getBodyBuilder()); - - // With the exception of getMores inheriting the WriteConcern from the originating command, - // nothing in run() should change the writeConcern. - if (command->getLogicalOp() == LogicalOp::opGetMore) { - dassert(!extractedWriteConcern, - "opGetMore contained unexpected extracted write concern"); - } else { - dassert(extractedWriteConcern, "no extracted write concern"); - dassert(opCtx->getWriteConcern() == extractedWriteConcern, - "opCtx wc: {} extracted wc: {}"_format( - opCtx->getWriteConcern().toBSON().jsonString(), - extractedWriteConcern->toBSON().jsonString())); - } - } else { - behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); - if (shouldCheckOutSession) { - invokeWithSessionCheckedOut(opCtx, request, invocation, sessionOptions, replyBuilder); - } else { - invokeWithNoSession(opCtx, request, invocation, replyBuilder); - } - } +Future RunCommandImpl::_epilogue() { + auto execContext = _ecd->getExecutionContext(); + auto opCtx = execContext->getOpCtx(); + auto& request = execContext->getRequest(); + auto command = execContext->getCommand(); + auto replyBuilder = execContext->getReplyBuilder(); + auto& behaviors = *execContext->behaviors; // This fail point blocks all commands which are running on the specified namespace, or which // are present in the given list of commands.If no namespace or command list are provided,then @@ -946,7 +971,7 @@ bool runCommandImpl(OperationContext* opCtx, // If 'ns' or 'commands' is not set, block for all the namespaces or commands // respectively. - return (ns.empty() || invocation->ns().ns() == ns) && + return (ns.empty() || _ecd->getInvocation()->ns().ns() == ns) && (commands.empty() || std::any_of(commands.begin(), commands.end(), [&request](auto& element) { return element.valueStringDataSafe() == request.getCommandName(); @@ -977,76 +1002,208 @@ bool runCommandImpl(OperationContext* opCtx, if (response.hasField("writeConcernError")) { wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt()); } - appendErrorLabelsAndTopologyVersion( - opCtx, &body, sessionOptions, command->getName(), code, wcCode, isInternalClient); + appendErrorLabelsAndTopologyVersion(opCtx, + &body, + _ecd->getSessionOptions(), + command->getName(), + code, + wcCode, + _isInternalClient()); } auto commandBodyBob = replyBuilder->getBodyBuilder(); behaviors.appendReplyMetadata(opCtx, request, &commandBodyBob); - appendClusterAndOperationTime(opCtx, &commandBodyBob, &commandBodyBob, startOperationTime); + appendClusterAndOperationTime( + opCtx, &commandBodyBob, &commandBodyBob, _ecd->getStartOperationTime()); return ok; } -class ExecCommandDatabase final { -public: - explicit ExecCommandDatabase(std::shared_ptr execContext) - : _isInternalClient(execContext->session() && - execContext->session()->getTags() & - transport::Session::kInternalClient), - _execContext(std::move(execContext)) {} - - /** - * Returns a future that executes a command after stripping metadata, performing authorization - * checks, handling audit impersonation, and (potentially) setting maintenance mode. The future - * also checks that the command is permissible to run on the node given its current replication - * state. All the logic here is independent of any particular command; any functionality - * relevant to a specific command should be confined to its run() method. - */ - static Future run(std::shared_ptr execContext) { - auto opCtx = execContext->getOpCtx(); - auto command = execContext->getCommand(); - auto& request = execContext->getRequest(); +Future RunCommandImpl::_runCommand() { + auto execContext = _ecd->getExecutionContext(); + invariant(!_shouldWaitForWriteConcern); + execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern( + execContext->getRequest().body); + if (_shouldCheckOutSession) { + return invokeWithSessionCheckedOut(_ecd); + } else { + return invokeWithNoSession(_ecd); + } + return Status::OK(); +} - auto instance = std::make_shared(std::move(execContext)); +void RunCommandImpl::RunCommandAndWaitForWriteConcern::_waitForWriteConcern(BSONObjBuilder& bb) { + auto invocation = _rci->_ecd->getInvocation().get(); + auto opCtx = _execContext->getOpCtx(); + if (auto scoped = failCommand.scopedIf([&](const BSONObj& obj) { + return CommandHelpers::shouldActivateFailCommandFailPoint( + obj, invocation, opCtx->getClient()) && + obj.hasField("writeConcernError"); + }); + MONGO_unlikely(scoped.isActive())) { + const BSONObj& data = scoped.getData(); + bb.append(data["writeConcernError"]); + if (data.hasField(kErrorLabelsFieldName) && data[kErrorLabelsFieldName].type() == Array) { + // Propagate error labels specified in the failCommand failpoint to the + // OperationContext decoration to override getErrorLabels() behaviors. + invariant(!errorLabelsOverride(opCtx)); + errorLabelsOverride(opCtx).emplace( + data.getObjectField(kErrorLabelsFieldName).getOwned()); + } + return; + } - CommandHelpers::uassertShouldAttemptParse(opCtx, command, request); - instance->_startOperationTime = getClientOperationTime(opCtx); + CurOp::get(opCtx)->debug().writeConcern.emplace(opCtx->getWriteConcern()); + _execContext->behaviors->waitForWriteConcern(opCtx, invocation, _lastOpBeforeRun.get(), bb); +} - instance->_invocation = command->parse(opCtx, request); - CommandInvocation::set(opCtx, instance->_invocation); +Future RunCommandImpl::RunCommandAndWaitForWriteConcern::run( + std::shared_ptr rci) { + auto instance = std::make_shared(std::move(rci)); + // `_setup()` runs inline as part of preparing the future-chain, which will run the command and + // waits for write concern, and may throw. + instance->_setup(); + auto pf = makePromiseFuture(); + auto future = std::move(pf.future) + .then([instance] { return instance->_run(); }) + .onCompletion([instance](Status status) { + return instance->_onRunCompletion(std::move(status)); + }); + pf.promise.emplaceValue(); + return future; +} - return instance->_initiateCommand() - .then([instance] { return instance->_commandExec(); }) - .onError([instance = std::move(instance)](Status status) mutable { - return instance->_handleFailure(std::move(status)); - }); - } +void RunCommandImpl::RunCommandAndWaitForWriteConcern::_setup() { + auto invocation = _rci->_ecd->getInvocation(); + OperationContext* opCtx = _execContext->getOpCtx(); + const Command* command = invocation->definition(); + const OpMsgRequest& request = _execContext->getRequest(); -private: - // Any logic, such as authorization and auditing, that must precede execution of the command. - Future _initiateCommand(); + _lastOpBeforeRun.emplace(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()); - // Returns the future chain that executes the parsed command against the database. - Future _commandExec(); + if (command->getLogicalOp() == LogicalOp::opGetMore) { + // WriteConcern will be set up during command processing, it must not be specified on + // the command body. + _execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern(request.body); + } else { + // WriteConcern should always be explicitly specified by operations received on shard + // and config servers, even if it is empty (ie. writeConcern: {}). In this context + // (shard/config servers) an empty WC indicates the operation should use the implicit + // server defaults. So, warn if the operation has not specified writeConcern and is on + // a shard/config server. + if (!opCtx->getClient()->isInDirectClient() && + (!opCtx->inMultiDocumentTransaction() || isTransactionCommand(command->getName()))) { + if (_rci->_isInternalClient()) { + // WriteConcern should always be explicitly specified by operations received + // from internal clients (ie. from a mongos or mongod), even if it is empty + // (ie. writeConcern: {}, which is equivalent to { w: 1, wtimeout: 0 }). + uassert( + 4569201, + "received command without explicit writeConcern on an internalClient connection {}"_format( + redact(request.body.toString())), + request.body.hasField(WriteConcernOptions::kWriteConcernField)); + } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer || + serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + if (!request.body.hasField(WriteConcernOptions::kWriteConcernField)) { + // TODO: Disabled until after SERVER-44539, to avoid log spam. + // LOGV2(21959, "Missing writeConcern on {command}", "Missing " + // "writeConcern on command", "command"_attr = command->getName()); + } + } + } + _extractedWriteConcern.emplace( + uassertStatusOK(extractWriteConcern(opCtx, request.body, _rci->_isInternalClient()))); + if (_rci->_ecd->getSessionOptions().getAutocommit()) { + validateWriteConcernForTransaction(*_extractedWriteConcern, + invocation->definition()->getName()); + } - // Any error-handling logic that must be performed if the command initiation/execution fails. - void _handleFailure(Status status); + // Ensure that the WC being set on the opCtx has provenance. + invariant(_extractedWriteConcern->getProvenance().hasSource(), + fmt::format("unexpected unset provenance on writeConcern: {}", + _extractedWriteConcern->toBSON().jsonString())); - // An indication on whether the command is initiated by an internal client. - const bool _isInternalClient; + opCtx->setWriteConcern(*_extractedWriteConcern); + } +} - std::shared_ptr const _execContext; +Future RunCommandImpl::RunCommandAndWaitForWriteConcern::_run() { + if (auto scoped = failWithErrorCodeInRunCommand.scoped(); MONGO_unlikely(scoped.isActive())) { + const auto errorCode = scoped.getData()["errorCode"].numberInt(); + LOGV2(21960, + "failWithErrorCodeInRunCommand enabled - failing command with error " + "code: {errorCode}", + "failWithErrorCodeInRunCommand enabled, failing command", + "errorCode"_attr = errorCode); + BSONObjBuilder errorBuilder; + errorBuilder.append("ok", 0.0); + errorBuilder.append("code", errorCode); + errorBuilder.append("errmsg", "failWithErrorCodeInRunCommand enabled."); + _execContext->getReplyBuilder()->setCommandReply(errorBuilder.obj()); + return Status::OK(); + } + if (_rci->_shouldCheckOutSession) + return invokeWithSessionCheckedOut(_rci->_ecd); + return invokeWithNoSession(_rci->_ecd); +} - // The following allows `_initiateCommand`, `_commandExec`, and `_handleFailure` to share - // execution state without concerning the lifetime of these variables. In particular, `_scoped` - // is a scoped variable that once created, lives as long as its owner (i.e., an instance of - // `ExecCommandDatabase`) lives. - BSONObjBuilder _extraFieldsBuilder; - std::shared_ptr _invocation; - LogicalTime _startOperationTime; - OperationSessionInfoFromClient _sessionOptions; - std::unique_ptr _scoped; -}; +Future RunCommandImpl::RunCommandAndWaitForWriteConcern::_onRunCompletion(Status status) { + auto opCtx = _execContext->getOpCtx(); + if (!status.isOK()) { + // Do no-op write before returning NoSuchTransaction if command has writeConcern. + if (status.code() == ErrorCodes::NoSuchTransaction && + !opCtx->getWriteConcern().usedDefault) { + TransactionParticipant::performNoopWrite(opCtx, "NoSuchTransaction error"); + } + _waitForWriteConcern(*_rci->_ecd->getExtraFieldsBuilder()); + return status; + } + + auto bb = _execContext->getReplyBuilder()->getBodyBuilder(); + _waitForWriteConcern(bb); + + // With the exception of getMores inheriting the WriteConcern from the originating command, + // nothing in run() should change the writeConcern. + if (_execContext->getCommand()->getLogicalOp() == LogicalOp::opGetMore) { + dassert(!_extractedWriteConcern, "opGetMore contained unexpected extracted write concern"); + } else { + dassert(_extractedWriteConcern, "no extracted write concern"); + dassert( + opCtx->getWriteConcern() == _extractedWriteConcern, + "opCtx wc: {} extracted wc: {}"_format(opCtx->getWriteConcern().toBSON().jsonString(), + _extractedWriteConcern->toBSON().jsonString())); + } + return status; +} + +Future RunCommandImpl::_makeFutureChain() { + return _prologue() + .then([this] { + if (_shouldWaitForWriteConcern) + return RunCommandAndWaitForWriteConcern::run(shared_from_this()); + else + return _runCommand(); + }) + .then([this] { return _epilogue(); }) + .onCompletion( + [this, anchor = shared_from_this()](StatusWith ranSuccessfully) -> Future { + // Failure to run a command is either indicated by throwing an exception or adding a + // non-okay field to the replyBuilder. The input argument (i.e., `ranSuccessfully`) + // captures both cases. On success, it holds an okay status and a `true` value. + auto status = ranSuccessfully.getStatus(); + if (status.isOK() && ranSuccessfully.getValue()) + return Status::OK(); + + auto execContext = _ecd->getExecutionContext(); + execContext->getCommand()->incrementCommandsFailed(); + if (status.code() == ErrorCodes::Unauthorized) { + CommandHelpers::auditLogAuthEvent(execContext->getOpCtx(), + _ecd->getInvocation().get(), + execContext->getRequest(), + status.code()); + } + return status; + }); +} Future ExecCommandDatabase::_initiateCommand() try { auto opCtx = _execContext->getOpCtx(); @@ -1122,7 +1279,7 @@ Future ExecCommandDatabase::_initiateCommand() try { } else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) { uassert(ErrorCodes::InvalidOptions, "Can not specify maxTimeMSOpOnly for non internal clients", - _isInternalClient); + _isInternalClient()); maxTimeMSOpOnlyField = element; } else if (fieldName == "allowImplicitCollectionCreation") { allowImplicitCollectionCreationField = element; @@ -1247,7 +1404,7 @@ Future ExecCommandDatabase::_initiateCommand() try { bool startTransaction = static_cast(_sessionOptions.getStartTransaction()); if (!skipReadConcern) { auto newReadConcernArgs = uassertStatusOK(_extractReadConcern( - opCtx, _invocation.get(), request.body, startTransaction, _isInternalClient)); + opCtx, _invocation.get(), request.body, startTransaction, _isInternalClient())); // Ensure that the RC being set on the opCtx has provenance. invariant(newReadConcernArgs.getProvenance().hasSource(), @@ -1334,25 +1491,8 @@ Future ExecCommandDatabase::_initiateCommand() try { return ex.toStatus(); } -Future ExecCommandDatabase::_commandExec() try { - if (!runCommandImpl(_execContext->getOpCtx(), - _invocation.get(), - _execContext->getRequest(), - _execContext->getReplyBuilder(), - _startOperationTime, - *_execContext->behaviors, - &_extraFieldsBuilder, - _sessionOptions)) { - _execContext->getCommand()->incrementCommandsFailed(); - } - return Status::OK(); -} catch (const DBException& ex) { - _execContext->getCommand()->incrementCommandsFailed(); - if (ex.code() == ErrorCodes::Unauthorized) { - CommandHelpers::auditLogAuthEvent( - _execContext->getOpCtx(), _invocation.get(), _execContext->getRequest(), ex.code()); - } - return ex.toStatus(); +Future ExecCommandDatabase::_commandExec() { + return RunCommandImpl::run(shared_from_this()); } void ExecCommandDatabase::_handleFailure(Status status) { @@ -1380,7 +1520,7 @@ void ExecCommandDatabase::_handleFailure(Status status) { command->getName(), status.code(), wcCode, - _isInternalClient); + _isInternalClient()); BSONObjBuilder metadataBob; behaviors.appendReplyMetadata(opCtx, request, &metadataBob); @@ -1389,8 +1529,11 @@ void ExecCommandDatabase::_handleFailure(Status status) { // it here, so if it is valid it can be used to compute the proper operationTime. auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); if (readConcernArgs.isEmpty()) { - auto readConcernArgsStatus = _extractReadConcern( - opCtx, _invocation.get(), request.body, false /*startTransaction*/, _isInternalClient); + auto readConcernArgsStatus = _extractReadConcern(opCtx, + _invocation.get(), + request.body, + false /*startTransaction*/, + _isInternalClient()); if (readConcernArgsStatus.isOK()) { // We must obtain the client lock to set the ReadConcernArgs on the operation context as // it may be concurrently read by CurrentOp. -- cgit v1.2.1