summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-10-14 17:17:30 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-10-14 17:17:30 +0000
commit1d6af89487957dfa75835b60f4c09e8c3cdf5cd5 (patch)
tree2181619d94e7548978af82952df6d934492a8ed9
parent38f350478add12941539ec6f44034d5ba4443265 (diff)
downloadmongo-1d6af89487957dfa75835b60f4c09e8c3cdf5cd5.tar.gz
SERVER-49107 Futurize and refactor runCommandImpl()
-rw-r--r--src/mongo/db/service_entry_point_common.cpp627
1 files changed, 385 insertions, 242 deletions
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index d586c76c845..58aec2a488b 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -590,22 +590,184 @@ void _abortUnpreparedOrStashPreparedTransaction(
}
}
-void invokeWithNoSession(OperationContext* opCtx,
- const OpMsgRequest& request,
- CommandInvocation* invocation,
- rpc::ReplyBuilderInterface* replyBuilder) {
+class ExecCommandDatabase : public std::enable_shared_from_this<ExecCommandDatabase> {
+public:
+ explicit ExecCommandDatabase(std::shared_ptr<HandleRequest::ExecutionContext> execContext)
+ : _execContext(std::move(execContext)) {}
+
+ static Future<void> run(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
+ return std::make_shared<ExecCommandDatabase>(std::move(execContext))->_makeFutureChain();
+ }
+
+ std::shared_ptr<HandleRequest::ExecutionContext> getExecutionContext() {
+ return _execContext;
+ }
+ std::shared_ptr<CommandInvocation> getInvocation() {
+ return _invocation;
+ }
+ const OperationSessionInfoFromClient& getSessionOptions() const {
+ return _sessionOptions;
+ }
+ BSONObjBuilder* getExtraFieldsBuilder() {
+ return &_extraFieldsBuilder;
+ }
+ const LogicalTime& getStartOperationTime() const {
+ return _startOperationTime;
+ }
+
+private:
+ // Returns a future that executes a command after stripping metadata, performing authorization
+ // checks, handling audit impersonation, and (potentially) setting maintenance mode. The future
+ // also checks that the command is permissible to run on the node given its current replication
+ // state. All the logic here is independent of any particular command; any functionality
+ // relevant to a specific command should be confined to its run() method.
+ Future<void> _makeFutureChain() {
+ return _parseCommand().then([this, anchor = shared_from_this()] {
+ return _initiateCommand()
+ .then([this] { return _commandExec(); })
+ .onError([this, anchor = shared_from_this()](Status status) {
+ return _handleFailure(std::move(status));
+ });
+ });
+ }
+
+ Future<void> _parseCommand() {
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future).then([this, anchor = shared_from_this()] {
+ auto opCtx = _execContext->getOpCtx();
+ auto command = _execContext->getCommand();
+ auto& request = _execContext->getRequest();
+
+ CommandHelpers::uassertShouldAttemptParse(opCtx, command, request);
+ _startOperationTime = getClientOperationTime(opCtx);
+
+ _invocation = command->parse(opCtx, request);
+ CommandInvocation::set(opCtx, _invocation);
+ });
+ pf.promise.emplaceValue();
+ return future;
+ }
+
+ // Any logic, such as authorization and auditing, that must precede execution of the command.
+ Future<void> _initiateCommand();
+
+ // Returns the future chain that executes the parsed command against the database.
+ Future<void> _commandExec();
+
+ // Any error-handling logic that must be performed if the command initiation/execution fails.
+ void _handleFailure(Status status);
+
+ bool _isInternalClient() const {
+ return _execContext->session() &&
+ _execContext->session()->getTags() & transport::Session::kInternalClient;
+ }
+
+ const std::shared_ptr<HandleRequest::ExecutionContext> _execContext;
+
+ // The following allows `_initiateCommand`, `_commandExec`, and `_handleFailure` to share
+ // execution state without concerning the lifetime of these variables.
+ BSONObjBuilder _extraFieldsBuilder;
+ std::shared_ptr<CommandInvocation> _invocation;
+ LogicalTime _startOperationTime;
+ OperationSessionInfoFromClient _sessionOptions;
+ std::unique_ptr<PolymorphicScoped> _scoped;
+};
+
+class RunCommandImpl : public std::enable_shared_from_this<RunCommandImpl> {
+public:
+ explicit RunCommandImpl(std::shared_ptr<ExecCommandDatabase> ecd)
+ : _ecd(std::move(ecd)),
+ _shouldCheckOutSession(
+ _ecd->getSessionOptions().getTxnNumber() &&
+ !shouldCommandSkipSessionCheckout(_ecd->getInvocation()->definition()->getName())),
+ _shouldWaitForWriteConcern(_ecd->getInvocation()->supportsWriteConcern() ||
+ _ecd->getInvocation()->definition()->getLogicalOp() ==
+ LogicalOp::opGetMore) {}
+
+ static Future<void> run(std::shared_ptr<ExecCommandDatabase> ecd) {
+ return std::make_shared<RunCommandImpl>(std::move(ecd))->_makeFutureChain();
+ }
+
+private:
+ Future<void> _makeFutureChain();
+
+ // Anchor for references to attributes defined in `ExecCommandDatabase` (e.g., sessionOptions).
+ const std::shared_ptr<ExecCommandDatabase> _ecd;
+
+ // Any code that must run before command execution (e.g., reserving bytes for reply builder).
+ Future<void> _prologue();
+
+ // Runs the command without waiting for write concern
+ Future<void> _runCommand();
+
+ class RunCommandAndWaitForWriteConcern {
+ public:
+ explicit RunCommandAndWaitForWriteConcern(std::shared_ptr<RunCommandImpl> rci)
+ : _rci(std::move(rci)),
+ _execContext(_rci->_ecd->getExecutionContext()),
+ _oldWriteConcern(_execContext->getOpCtx()->getWriteConcern()) {}
+
+ ~RunCommandAndWaitForWriteConcern() {
+ _execContext->getOpCtx()->setWriteConcern(_oldWriteConcern);
+ }
+
+ static Future<void> run(std::shared_ptr<RunCommandImpl>);
+
+ private:
+ void _waitForWriteConcern(BSONObjBuilder& bb);
+
+ void _setup();
+ Future<void> _run();
+ Future<void> _onRunCompletion(Status);
+
+ const std::shared_ptr<RunCommandImpl> _rci;
+ const std::shared_ptr<HandleRequest::ExecutionContext> _execContext;
+
+ // Allows changing the write concern while running the command and resetting on destruction.
+ const WriteConcernOptions _oldWriteConcern;
+ boost::optional<repl::OpTime> _lastOpBeforeRun;
+ boost::optional<WriteConcernOptions> _extractedWriteConcern;
+ };
+
+ // Any code that must run after command execution -- returns true on successful execution.
+ Future<bool> _epilogue();
+
+ bool _isInternalClient() const {
+ auto session = _ecd->getExecutionContext()->session();
+ return session && session->getTags() & transport::Session::kInternalClient;
+ }
+
+ // Whether invoking the command requires a session to be checked out.
+ const bool _shouldCheckOutSession;
+
+ // getMore operations inherit a WriteConcern from their originating cursor. For example, if the
+ // originating command was an aggregate with a $out and batchSize: 0. Note that if the command
+ // only performed reads then we will not need to wait at all.
+ const bool _shouldWaitForWriteConcern;
+};
+
+Future<void> invokeWithNoSession(std::shared_ptr<ExecCommandDatabase> ecd) {
+ auto execContext = ecd->getExecutionContext();
+ OperationContext* opCtx = execContext->getOpCtx();
+ const OpMsgRequest& request = execContext->getRequest();
+ CommandInvocation* invocation = ecd->getInvocation().get();
+ rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder();
tenant_migration_donor::migrationConflictHandler(
opCtx,
request.getDatabase(),
[&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
replyBuilder);
+ return Status::OK();
}
-void invokeWithSessionCheckedOut(OperationContext* opCtx,
- const OpMsgRequest& request,
- CommandInvocation* invocation,
- const OperationSessionInfoFromClient& sessionOptions,
- rpc::ReplyBuilderInterface* replyBuilder) {
+Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ecd) {
+ auto execContext = ecd->getExecutionContext();
+ OperationContext* opCtx = execContext->getOpCtx();
+ const OpMsgRequest& request = execContext->getRequest();
+ CommandInvocation* invocation = ecd->getInvocation().get();
+ const OperationSessionInfoFromClient& sessionOptions = ecd->getSessionOptions();
+ rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder();
+
// This constructor will check out the session. It handles the appropriate state management
// for both multi-statement transactions and retryable writes. Currently, only requests with
// a transaction number will check out the session.
@@ -742,7 +904,7 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
if (auto okField = replyBuilder->getBodyBuilder().asTempObj()["ok"]) {
// If ok is present, use its truthiness.
if (!okField.trueValue()) {
- return;
+ return Status::OK();
}
}
@@ -756,17 +918,13 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
auto bodyBuilder = replyBuilder->getBodyBuilder();
txnResponseMetadata.serialize(&bodyBuilder);
}
+ return Status::OK();
}
-bool runCommandImpl(OperationContext* opCtx,
- CommandInvocation* invocation,
- const OpMsgRequest& request,
- rpc::ReplyBuilderInterface* replyBuilder,
- LogicalTime startOperationTime,
- const ServiceEntryPointCommon::Hooks& behaviors,
- BSONObjBuilder* extraFieldsBuilder,
- const OperationSessionInfoFromClient& sessionOptions) {
- const Command* command = invocation->definition();
+Future<void> RunCommandImpl::_prologue() try {
+ auto execContext = _ecd->getExecutionContext();
+ auto opCtx = execContext->getOpCtx();
+ const Command* command = _ecd->getInvocation()->definition();
auto bytesToReserve = command->reserveBytesForReply();
// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the
// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency
@@ -775,19 +933,7 @@ bool runCommandImpl(OperationContext* opCtx,
if (kDebugBuild)
bytesToReserve = 0;
#endif
- replyBuilder->reserveBytes(bytesToReserve);
-
- const auto isInternalClient = opCtx->getClient()->session() &&
- (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient);
-
- const bool shouldCheckOutSession =
- sessionOptions.getTxnNumber() && !shouldCommandSkipSessionCheckout(command->getName());
-
- // getMore operations inherit a WriteConcern from their originating cursor. For example, if the
- // originating command was an aggregate with a $out and batchSize: 0. Note that if the command
- // only performed reads then we will not need to wait at all.
- const bool shouldWaitForWriteConcern =
- invocation->supportsWriteConcern() || command->getLogicalOp() == LogicalOp::opGetMore;
+ execContext->getReplyBuilder()->reserveBytes(bytesToReserve);
// Record readConcern usages for commands run outside of transactions, excluding DBDirectClient.
// For commands inside a transaction, they inherit the readConcern from the transaction. So we
@@ -797,139 +943,18 @@ bool runCommandImpl(OperationContext* opCtx,
ServerReadConcernMetrics::get(opCtx)->recordReadConcern(repl::ReadConcernArgs::get(opCtx),
false /* isTransaction */);
}
+ return Status::OK();
+} catch (const DBException& ex) {
+ return ex.toStatus();
+}
- if (shouldWaitForWriteConcern) {
- auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
-
- // Change the write concern while running the command.
- const auto oldWC = opCtx->getWriteConcern();
- ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });
-
- boost::optional<WriteConcernOptions> extractedWriteConcern;
- if (command->getLogicalOp() == LogicalOp::opGetMore) {
- // WriteConcern will be set up during command processing, it must not be specified on
- // the command body.
- behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
- } else {
- // WriteConcern should always be explicitly specified by operations received on shard
- // and config servers, even if it is empty (ie. writeConcern: {}). In this context
- // (shard/config servers) an empty WC indicates the operation should use the implicit
- // server defaults. So, warn if the operation has not specified writeConcern and is on
- // a shard/config server.
- if (!opCtx->getClient()->isInDirectClient() &&
- (!opCtx->inMultiDocumentTransaction() ||
- isTransactionCommand(command->getName()))) {
- if (isInternalClient) {
- // WriteConcern should always be explicitly specified by operations received
- // from internal clients (ie. from a mongos or mongod), even if it is empty
- // (ie. writeConcern: {}, which is equivalent to { w: 1, wtimeout: 0 }).
- uassert(
- 4569201,
- "received command without explicit writeConcern on an internalClient connection {}"_format(
- redact(request.body.toString())),
- request.body.hasField(WriteConcernOptions::kWriteConcernField));
- } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer ||
- serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- if (!request.body.hasField(WriteConcernOptions::kWriteConcernField)) {
- // TODO: Disabled until after SERVER-44539, to avoid log spam.
- // LOGV2(21959, "Missing writeConcern on {command}", "Missing "
- // "writeConcern on command", "command"_attr = command->getName());
- }
- }
- }
- extractedWriteConcern.emplace(
- uassertStatusOK(extractWriteConcern(opCtx, request.body, isInternalClient)));
- if (sessionOptions.getAutocommit()) {
- validateWriteConcernForTransaction(*extractedWriteConcern,
- invocation->definition()->getName());
- }
-
- // Ensure that the WC being set on the opCtx has provenance.
- invariant(extractedWriteConcern->getProvenance().hasSource(),
- str::stream() << "unexpected unset provenance on writeConcern: "
- << extractedWriteConcern->toBSON());
-
- opCtx->setWriteConcern(*extractedWriteConcern);
- }
-
- auto waitForWriteConcern = [&](auto&& bb) {
- bool reallyWait = true;
- failCommand.executeIf(
- [&](const BSONObj& data) {
- bb.append(data["writeConcernError"]);
- reallyWait = false;
- if (data.hasField(kErrorLabelsFieldName) &&
- data[kErrorLabelsFieldName].type() == Array) {
- // Propagate error labels specified in the failCommand failpoint to the
- // OperationContext decoration to override getErrorLabels() behaviors.
- invariant(!errorLabelsOverride(opCtx));
- errorLabelsOverride(opCtx).emplace(
- data.getObjectField(kErrorLabelsFieldName).getOwned());
- }
- },
- [&](const BSONObj& data) {
- return CommandHelpers::shouldActivateFailCommandFailPoint(
- data, invocation, opCtx->getClient()) &&
- data.hasField("writeConcernError");
- });
- if (reallyWait) {
- CurOp::get(opCtx)->debug().writeConcern.emplace(opCtx->getWriteConcern());
- behaviors.waitForWriteConcern(opCtx, invocation, lastOpBeforeRun, bb);
- }
- };
-
- try {
- if (auto scoped = failWithErrorCodeInRunCommand.scoped();
- MONGO_unlikely(scoped.isActive())) {
- const auto errorCode = scoped.getData()["errorCode"].numberInt();
- LOGV2(21960,
- "failWithErrorCodeInRunCommand enabled - failing command with error "
- "code: {errorCode}",
- "failWithErrorCodeInRunCommand enabled, failing command",
- "errorCode"_attr = errorCode);
- BSONObjBuilder errorBuilder;
- errorBuilder.append("ok", 0.0);
- errorBuilder.append("code", errorCode);
- errorBuilder.append("errmsg", "failWithErrorCodeInRunCommand enabled.");
- replyBuilder->setCommandReply(errorBuilder.obj());
- } else if (shouldCheckOutSession) {
- invokeWithSessionCheckedOut(
- opCtx, request, invocation, sessionOptions, replyBuilder);
- } else {
- invokeWithNoSession(opCtx, request, invocation, replyBuilder);
- }
- } catch (const DBException& ex) {
- // Do no-op write before returning NoSuchTransaction if command has writeConcern.
- if (ex.toStatus().code() == ErrorCodes::NoSuchTransaction &&
- !opCtx->getWriteConcern().usedDefault) {
- TransactionParticipant::performNoopWrite(opCtx, "NoSuchTransaction error");
- }
- waitForWriteConcern(*extraFieldsBuilder);
- throw;
- }
-
- waitForWriteConcern(replyBuilder->getBodyBuilder());
-
- // With the exception of getMores inheriting the WriteConcern from the originating command,
- // nothing in run() should change the writeConcern.
- if (command->getLogicalOp() == LogicalOp::opGetMore) {
- dassert(!extractedWriteConcern,
- "opGetMore contained unexpected extracted write concern");
- } else {
- dassert(extractedWriteConcern, "no extracted write concern");
- dassert(opCtx->getWriteConcern() == extractedWriteConcern,
- "opCtx wc: {} extracted wc: {}"_format(
- opCtx->getWriteConcern().toBSON().jsonString(),
- extractedWriteConcern->toBSON().jsonString()));
- }
- } else {
- behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
- if (shouldCheckOutSession) {
- invokeWithSessionCheckedOut(opCtx, request, invocation, sessionOptions, replyBuilder);
- } else {
- invokeWithNoSession(opCtx, request, invocation, replyBuilder);
- }
- }
+Future<bool> RunCommandImpl::_epilogue() {
+ auto execContext = _ecd->getExecutionContext();
+ auto opCtx = execContext->getOpCtx();
+ auto& request = execContext->getRequest();
+ auto command = execContext->getCommand();
+ auto replyBuilder = execContext->getReplyBuilder();
+ auto& behaviors = *execContext->behaviors;
// This fail point blocks all commands which are running on the specified namespace, or which
// are present in the given list of commands.If no namespace or command list are provided,then
@@ -946,7 +971,7 @@ bool runCommandImpl(OperationContext* opCtx,
// If 'ns' or 'commands' is not set, block for all the namespaces or commands
// respectively.
- return (ns.empty() || invocation->ns().ns() == ns) &&
+ return (ns.empty() || _ecd->getInvocation()->ns().ns() == ns) &&
(commands.empty() ||
std::any_of(commands.begin(), commands.end(), [&request](auto& element) {
return element.valueStringDataSafe() == request.getCommandName();
@@ -977,76 +1002,208 @@ bool runCommandImpl(OperationContext* opCtx,
if (response.hasField("writeConcernError")) {
wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt());
}
- appendErrorLabelsAndTopologyVersion(
- opCtx, &body, sessionOptions, command->getName(), code, wcCode, isInternalClient);
+ appendErrorLabelsAndTopologyVersion(opCtx,
+ &body,
+ _ecd->getSessionOptions(),
+ command->getName(),
+ code,
+ wcCode,
+ _isInternalClient());
}
auto commandBodyBob = replyBuilder->getBodyBuilder();
behaviors.appendReplyMetadata(opCtx, request, &commandBodyBob);
- appendClusterAndOperationTime(opCtx, &commandBodyBob, &commandBodyBob, startOperationTime);
+ appendClusterAndOperationTime(
+ opCtx, &commandBodyBob, &commandBodyBob, _ecd->getStartOperationTime());
return ok;
}
-class ExecCommandDatabase final {
-public:
- explicit ExecCommandDatabase(std::shared_ptr<HandleRequest::ExecutionContext> execContext)
- : _isInternalClient(execContext->session() &&
- execContext->session()->getTags() &
- transport::Session::kInternalClient),
- _execContext(std::move(execContext)) {}
-
- /**
- * Returns a future that executes a command after stripping metadata, performing authorization
- * checks, handling audit impersonation, and (potentially) setting maintenance mode. The future
- * also checks that the command is permissible to run on the node given its current replication
- * state. All the logic here is independent of any particular command; any functionality
- * relevant to a specific command should be confined to its run() method.
- */
- static Future<void> run(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
- auto opCtx = execContext->getOpCtx();
- auto command = execContext->getCommand();
- auto& request = execContext->getRequest();
+Future<void> RunCommandImpl::_runCommand() {
+ auto execContext = _ecd->getExecutionContext();
+ invariant(!_shouldWaitForWriteConcern);
+ execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern(
+ execContext->getRequest().body);
+ if (_shouldCheckOutSession) {
+ return invokeWithSessionCheckedOut(_ecd);
+ } else {
+ return invokeWithNoSession(_ecd);
+ }
+ return Status::OK();
+}
- auto instance = std::make_shared<ExecCommandDatabase>(std::move(execContext));
+void RunCommandImpl::RunCommandAndWaitForWriteConcern::_waitForWriteConcern(BSONObjBuilder& bb) {
+ auto invocation = _rci->_ecd->getInvocation().get();
+ auto opCtx = _execContext->getOpCtx();
+ if (auto scoped = failCommand.scopedIf([&](const BSONObj& obj) {
+ return CommandHelpers::shouldActivateFailCommandFailPoint(
+ obj, invocation, opCtx->getClient()) &&
+ obj.hasField("writeConcernError");
+ });
+ MONGO_unlikely(scoped.isActive())) {
+ const BSONObj& data = scoped.getData();
+ bb.append(data["writeConcernError"]);
+ if (data.hasField(kErrorLabelsFieldName) && data[kErrorLabelsFieldName].type() == Array) {
+ // Propagate error labels specified in the failCommand failpoint to the
+ // OperationContext decoration to override getErrorLabels() behaviors.
+ invariant(!errorLabelsOverride(opCtx));
+ errorLabelsOverride(opCtx).emplace(
+ data.getObjectField(kErrorLabelsFieldName).getOwned());
+ }
+ return;
+ }
- CommandHelpers::uassertShouldAttemptParse(opCtx, command, request);
- instance->_startOperationTime = getClientOperationTime(opCtx);
+ CurOp::get(opCtx)->debug().writeConcern.emplace(opCtx->getWriteConcern());
+ _execContext->behaviors->waitForWriteConcern(opCtx, invocation, _lastOpBeforeRun.get(), bb);
+}
- instance->_invocation = command->parse(opCtx, request);
- CommandInvocation::set(opCtx, instance->_invocation);
+Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::run(
+ std::shared_ptr<RunCommandImpl> rci) {
+ auto instance = std::make_shared<RunCommandAndWaitForWriteConcern>(std::move(rci));
+ // `_setup()` runs inline as part of preparing the future-chain, which will run the command and
+ // waits for write concern, and may throw.
+ instance->_setup();
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([instance] { return instance->_run(); })
+ .onCompletion([instance](Status status) {
+ return instance->_onRunCompletion(std::move(status));
+ });
+ pf.promise.emplaceValue();
+ return future;
+}
- return instance->_initiateCommand()
- .then([instance] { return instance->_commandExec(); })
- .onError([instance = std::move(instance)](Status status) mutable {
- return instance->_handleFailure(std::move(status));
- });
- }
+void RunCommandImpl::RunCommandAndWaitForWriteConcern::_setup() {
+ auto invocation = _rci->_ecd->getInvocation();
+ OperationContext* opCtx = _execContext->getOpCtx();
+ const Command* command = invocation->definition();
+ const OpMsgRequest& request = _execContext->getRequest();
-private:
- // Any logic, such as authorization and auditing, that must precede execution of the command.
- Future<void> _initiateCommand();
+ _lastOpBeforeRun.emplace(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
- // Returns the future chain that executes the parsed command against the database.
- Future<void> _commandExec();
+ if (command->getLogicalOp() == LogicalOp::opGetMore) {
+ // WriteConcern will be set up during command processing, it must not be specified on
+ // the command body.
+ _execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern(request.body);
+ } else {
+ // WriteConcern should always be explicitly specified by operations received on shard
+ // and config servers, even if it is empty (ie. writeConcern: {}). In this context
+ // (shard/config servers) an empty WC indicates the operation should use the implicit
+ // server defaults. So, warn if the operation has not specified writeConcern and is on
+ // a shard/config server.
+ if (!opCtx->getClient()->isInDirectClient() &&
+ (!opCtx->inMultiDocumentTransaction() || isTransactionCommand(command->getName()))) {
+ if (_rci->_isInternalClient()) {
+ // WriteConcern should always be explicitly specified by operations received
+ // from internal clients (ie. from a mongos or mongod), even if it is empty
+ // (ie. writeConcern: {}, which is equivalent to { w: 1, wtimeout: 0 }).
+ uassert(
+ 4569201,
+ "received command without explicit writeConcern on an internalClient connection {}"_format(
+ redact(request.body.toString())),
+ request.body.hasField(WriteConcernOptions::kWriteConcernField));
+ } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer ||
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ if (!request.body.hasField(WriteConcernOptions::kWriteConcernField)) {
+ // TODO: Disabled until after SERVER-44539, to avoid log spam.
+ // LOGV2(21959, "Missing writeConcern on {command}", "Missing "
+ // "writeConcern on command", "command"_attr = command->getName());
+ }
+ }
+ }
+ _extractedWriteConcern.emplace(
+ uassertStatusOK(extractWriteConcern(opCtx, request.body, _rci->_isInternalClient())));
+ if (_rci->_ecd->getSessionOptions().getAutocommit()) {
+ validateWriteConcernForTransaction(*_extractedWriteConcern,
+ invocation->definition()->getName());
+ }
- // Any error-handling logic that must be performed if the command initiation/execution fails.
- void _handleFailure(Status status);
+ // Ensure that the WC being set on the opCtx has provenance.
+ invariant(_extractedWriteConcern->getProvenance().hasSource(),
+ fmt::format("unexpected unset provenance on writeConcern: {}",
+ _extractedWriteConcern->toBSON().jsonString()));
- // An indication on whether the command is initiated by an internal client.
- const bool _isInternalClient;
+ opCtx->setWriteConcern(*_extractedWriteConcern);
+ }
+}
- std::shared_ptr<HandleRequest::ExecutionContext> const _execContext;
+Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::_run() {
+ if (auto scoped = failWithErrorCodeInRunCommand.scoped(); MONGO_unlikely(scoped.isActive())) {
+ const auto errorCode = scoped.getData()["errorCode"].numberInt();
+ LOGV2(21960,
+ "failWithErrorCodeInRunCommand enabled - failing command with error "
+ "code: {errorCode}",
+ "failWithErrorCodeInRunCommand enabled, failing command",
+ "errorCode"_attr = errorCode);
+ BSONObjBuilder errorBuilder;
+ errorBuilder.append("ok", 0.0);
+ errorBuilder.append("code", errorCode);
+ errorBuilder.append("errmsg", "failWithErrorCodeInRunCommand enabled.");
+ _execContext->getReplyBuilder()->setCommandReply(errorBuilder.obj());
+ return Status::OK();
+ }
+ if (_rci->_shouldCheckOutSession)
+ return invokeWithSessionCheckedOut(_rci->_ecd);
+ return invokeWithNoSession(_rci->_ecd);
+}
- // The following allows `_initiateCommand`, `_commandExec`, and `_handleFailure` to share
- // execution state without concerning the lifetime of these variables. In particular, `_scoped`
- // is a scoped variable that once created, lives as long as its owner (i.e., an instance of
- // `ExecCommandDatabase`) lives.
- BSONObjBuilder _extraFieldsBuilder;
- std::shared_ptr<CommandInvocation> _invocation;
- LogicalTime _startOperationTime;
- OperationSessionInfoFromClient _sessionOptions;
- std::unique_ptr<PolymorphicScoped> _scoped;
-};
+Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::_onRunCompletion(Status status) {
+ auto opCtx = _execContext->getOpCtx();
+ if (!status.isOK()) {
+ // Do no-op write before returning NoSuchTransaction if command has writeConcern.
+ if (status.code() == ErrorCodes::NoSuchTransaction &&
+ !opCtx->getWriteConcern().usedDefault) {
+ TransactionParticipant::performNoopWrite(opCtx, "NoSuchTransaction error");
+ }
+ _waitForWriteConcern(*_rci->_ecd->getExtraFieldsBuilder());
+ return status;
+ }
+
+ auto bb = _execContext->getReplyBuilder()->getBodyBuilder();
+ _waitForWriteConcern(bb);
+
+ // With the exception of getMores inheriting the WriteConcern from the originating command,
+ // nothing in run() should change the writeConcern.
+ if (_execContext->getCommand()->getLogicalOp() == LogicalOp::opGetMore) {
+ dassert(!_extractedWriteConcern, "opGetMore contained unexpected extracted write concern");
+ } else {
+ dassert(_extractedWriteConcern, "no extracted write concern");
+ dassert(
+ opCtx->getWriteConcern() == _extractedWriteConcern,
+ "opCtx wc: {} extracted wc: {}"_format(opCtx->getWriteConcern().toBSON().jsonString(),
+ _extractedWriteConcern->toBSON().jsonString()));
+ }
+ return status;
+}
+
+Future<void> RunCommandImpl::_makeFutureChain() {
+ return _prologue()
+ .then([this] {
+ if (_shouldWaitForWriteConcern)
+ return RunCommandAndWaitForWriteConcern::run(shared_from_this());
+ else
+ return _runCommand();
+ })
+ .then([this] { return _epilogue(); })
+ .onCompletion(
+ [this, anchor = shared_from_this()](StatusWith<bool> ranSuccessfully) -> Future<void> {
+ // Failure to run a command is either indicated by throwing an exception or adding a
+ // non-okay field to the replyBuilder. The input argument (i.e., `ranSuccessfully`)
+ // captures both cases. On success, it holds an okay status and a `true` value.
+ auto status = ranSuccessfully.getStatus();
+ if (status.isOK() && ranSuccessfully.getValue())
+ return Status::OK();
+
+ auto execContext = _ecd->getExecutionContext();
+ execContext->getCommand()->incrementCommandsFailed();
+ if (status.code() == ErrorCodes::Unauthorized) {
+ CommandHelpers::auditLogAuthEvent(execContext->getOpCtx(),
+ _ecd->getInvocation().get(),
+ execContext->getRequest(),
+ status.code());
+ }
+ return status;
+ });
+}
Future<void> ExecCommandDatabase::_initiateCommand() try {
auto opCtx = _execContext->getOpCtx();
@@ -1122,7 +1279,7 @@ Future<void> ExecCommandDatabase::_initiateCommand() try {
} else if (fieldName == QueryRequest::kMaxTimeMSOpOnlyField) {
uassert(ErrorCodes::InvalidOptions,
"Can not specify maxTimeMSOpOnly for non internal clients",
- _isInternalClient);
+ _isInternalClient());
maxTimeMSOpOnlyField = element;
} else if (fieldName == "allowImplicitCollectionCreation") {
allowImplicitCollectionCreationField = element;
@@ -1247,7 +1404,7 @@ Future<void> ExecCommandDatabase::_initiateCommand() try {
bool startTransaction = static_cast<bool>(_sessionOptions.getStartTransaction());
if (!skipReadConcern) {
auto newReadConcernArgs = uassertStatusOK(_extractReadConcern(
- opCtx, _invocation.get(), request.body, startTransaction, _isInternalClient));
+ opCtx, _invocation.get(), request.body, startTransaction, _isInternalClient()));
// Ensure that the RC being set on the opCtx has provenance.
invariant(newReadConcernArgs.getProvenance().hasSource(),
@@ -1334,25 +1491,8 @@ Future<void> ExecCommandDatabase::_initiateCommand() try {
return ex.toStatus();
}
-Future<void> ExecCommandDatabase::_commandExec() try {
- if (!runCommandImpl(_execContext->getOpCtx(),
- _invocation.get(),
- _execContext->getRequest(),
- _execContext->getReplyBuilder(),
- _startOperationTime,
- *_execContext->behaviors,
- &_extraFieldsBuilder,
- _sessionOptions)) {
- _execContext->getCommand()->incrementCommandsFailed();
- }
- return Status::OK();
-} catch (const DBException& ex) {
- _execContext->getCommand()->incrementCommandsFailed();
- if (ex.code() == ErrorCodes::Unauthorized) {
- CommandHelpers::auditLogAuthEvent(
- _execContext->getOpCtx(), _invocation.get(), _execContext->getRequest(), ex.code());
- }
- return ex.toStatus();
+Future<void> ExecCommandDatabase::_commandExec() {
+ return RunCommandImpl::run(shared_from_this());
}
void ExecCommandDatabase::_handleFailure(Status status) {
@@ -1380,7 +1520,7 @@ void ExecCommandDatabase::_handleFailure(Status status) {
command->getName(),
status.code(),
wcCode,
- _isInternalClient);
+ _isInternalClient());
BSONObjBuilder metadataBob;
behaviors.appendReplyMetadata(opCtx, request, &metadataBob);
@@ -1389,8 +1529,11 @@ void ExecCommandDatabase::_handleFailure(Status status) {
// it here, so if it is valid it can be used to compute the proper operationTime.
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
if (readConcernArgs.isEmpty()) {
- auto readConcernArgsStatus = _extractReadConcern(
- opCtx, _invocation.get(), request.body, false /*startTransaction*/, _isInternalClient);
+ auto readConcernArgsStatus = _extractReadConcern(opCtx,
+ _invocation.get(),
+ request.body,
+ false /*startTransaction*/,
+ _isInternalClient());
if (readConcernArgsStatus.isOK()) {
// We must obtain the client lock to set the ReadConcernArgs on the operation context as
// it may be concurrently read by CurrentOp.