diff options
Diffstat (limited to 'src')
32 files changed, 446 insertions, 219 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index ea62ac51fc0..2350a040b6d 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -45,6 +45,7 @@ #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/jsobj.h" #include "mongo/db/query/explain.h" +#include "mongo/db/read_concern_support_result.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/op_msg.h" @@ -269,6 +270,13 @@ public: virtual std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, const OpMsgRequest& request) = 0; + virtual std::unique_ptr<CommandInvocation> parseForExplain( + OperationContext* opCtx, + const OpMsgRequest& request, + boost::optional<ExplainOptions::Verbosity> explainVerbosity) { + return parse(opCtx, request); + } + /** * Returns the command's name. This value never changes for the lifetime of the command. */ @@ -433,46 +441,6 @@ private: }; /** - * The result of checking an invocation's support for readConcern. There are two parts: - * - Whether or not the invocation supports the given readConcern. - * - Whether or not the invocation permits having the default readConcern applied to it. - */ -struct ReadConcernSupportResult { - /** - * Whether this command invocation supports the requested readConcern level. This only - * applies when running outside transactions because all commands that are allowed to run - * in a transaction must support all the read concerns that can be used in a transaction. - */ - enum class ReadConcern { kSupported, kNotSupported } readConcern; - - /** - * Whether this command invocation supports applying the default readConcern to it. - */ - enum class DefaultReadConcern { kPermitted, kNotPermitted } defaultReadConcern; - - /** - * Construct with either the enum value or a bool, where true indicates - * ReadConcern::kSupported or DefaultReadConcern::kPermitted (as appropriate). - */ - ReadConcernSupportResult(ReadConcern supported, DefaultReadConcern defaultPermitted) - : readConcern(supported), defaultReadConcern(defaultPermitted) {} - - ReadConcernSupportResult(bool supported, DefaultReadConcern defaultPermitted) - : readConcern(supported ? ReadConcern::kSupported : ReadConcern::kNotSupported), - defaultReadConcern(defaultPermitted) {} - - ReadConcernSupportResult(ReadConcern supported, bool defaultPermitted) - : readConcern(supported), - defaultReadConcern(defaultPermitted ? DefaultReadConcern::kPermitted - : DefaultReadConcern::kNotPermitted) {} - - ReadConcernSupportResult(bool supported, bool defaultPermitted) - : readConcern(supported ? ReadConcern::kSupported : ReadConcern::kNotSupported), - defaultReadConcern(defaultPermitted ? DefaultReadConcern::kPermitted - : DefaultReadConcern::kNotPermitted) {} -}; - -/** * Represents a single invocation of a given command. */ class CommandInvocation { @@ -516,8 +484,9 @@ public: * Returns this invocation's support for readConcern. */ virtual ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const { - return {level == repl::ReadConcernLevel::kLocalReadConcern, - ReadConcernSupportResult::DefaultReadConcern::kNotPermitted}; + return {{level != repl::ReadConcernLevel::kLocalReadConcern, + {ErrorCodes::InvalidOptions, "read concern not supported"}}, + {{ErrorCodes::InvalidOptions, "default read concern not permitted"}}}; } /** @@ -667,8 +636,9 @@ public: virtual ReadConcernSupportResult supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj, repl::ReadConcernLevel level) const { - return {level == repl::ReadConcernLevel::kLocalReadConcern, - ReadConcernSupportResult::DefaultReadConcern::kNotPermitted}; + return {{level != repl::ReadConcernLevel::kLocalReadConcern, + {ErrorCodes::InvalidOptions, "read concern not supported"}}, + {{ErrorCodes::InvalidOptions, "default read concern not permitted"}}}; } virtual bool allowsAfterClusterTime(const BSONObj& cmdObj) const { diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index da2033e7645..b31bb5822cc 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -93,8 +93,7 @@ public: ReadConcernSupportResult supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj, repl::ReadConcernLevel level) const override { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } ReadWriteType getReadWriteType() const override { diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index ff93a8fcfae..4da2af80abf 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -87,8 +87,7 @@ public: ReadConcernSupportResult supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj, repl::ReadConcernLevel level) const override { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } ReadWriteType getReadWriteType() const override { diff --git a/src/mongo/db/commands/explain_cmd.cpp b/src/mongo/db/commands/explain_cmd.cpp index 78fa4d67217..2d3460d6a63 100644 --- a/src/mongo/db/commands/explain_cmd.cpp +++ b/src/mongo/db/commands/explain_cmd.cpp @@ -170,7 +170,7 @@ std::unique_ptr<CommandInvocation> CmdExplain::parse(OperationContext* opCtx, explainedCommand); auto innerRequest = std::make_unique<OpMsgRequest>(OpMsgRequest::fromDBAndBody(dbname, explainedObj)); - auto innerInvocation = explainedCommand->parse(opCtx, *innerRequest); + auto innerInvocation = explainedCommand->parseForExplain(opCtx, *innerRequest, verbosity); return std::make_unique<Invocation>( this, request, std::move(verbosity), std::move(innerRequest), std::move(innerInvocation)); } diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 56e65aad432..bd24d034e29 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -178,8 +178,7 @@ public: } ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const final { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } bool canIgnorePrepareConflicts() const override { diff --git a/src/mongo/db/commands/haystack.cpp b/src/mongo/db/commands/haystack.cpp index a14a4fd3a29..3065e3e3f7c 100644 --- a/src/mongo/db/commands/haystack.cpp +++ b/src/mongo/db/commands/haystack.cpp @@ -76,8 +76,7 @@ public: ReadConcernSupportResult supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj, repl::ReadConcernLevel level) const final { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } ReadWriteType getReadWriteType() const { diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 85c220d091a..a70ae8a51d3 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -33,42 +33,62 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/run_aggregate.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline.h" namespace mongo { namespace { -bool isMergePipeline(const std::vector<BSONObj>& pipeline) { - if (pipeline.empty()) { - return false; - } - return pipeline[0].hasField("$mergeCursors"); -} - class PipelineCommand final : public Command { public: PipelineCommand() : Command("aggregate") {} + /** + * It's not known until after parsing whether or not an aggregation command is an explain + * request, because it might include the `explain: true` field (ie. aggregation explains do not + * need to arrive via the `explain` command). Therefore even parsing of regular aggregation + * commands needs to be able to handle the explain case. + * + * As a result, aggregation command parsing is done in parseForExplain(): + * + * - To parse a regular aggregation command, call parseForExplain() with `explainVerbosity` of + * boost::none. + * + * - To parse an aggregation command as the sub-command in an `explain` command, call + * parseForExplain() with `explainVerbosity` set to the desired verbosity. + */ std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, const OpMsgRequest& opMsgRequest) override { - // TODO: Parsing to a Pipeline and/or AggregationRequest here. - - auto privileges = - uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) - ->getPrivilegesForAggregate( - AggregationRequest::parseNs( - opMsgRequest.getDatabase().toString(), opMsgRequest.body), - opMsgRequest.body, - false)); - return std::make_unique<Invocation>(this, opMsgRequest, std::move(privileges)); + return parseForExplain(opCtx, opMsgRequest, boost::none); + } + + std::unique_ptr<CommandInvocation> parseForExplain( + OperationContext* opCtx, + const OpMsgRequest& opMsgRequest, + boost::optional<ExplainOptions::Verbosity> explainVerbosity) override { + const auto aggregationRequest = uassertStatusOK(AggregationRequest::parseFromBSON( + opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity)); + + auto privileges = uassertStatusOK( + AuthorizationSession::get(opCtx->getClient()) + ->getPrivilegesForAggregate( + aggregationRequest.getNamespaceString(), opMsgRequest.body, false)); + + return std::make_unique<Invocation>( + this, opMsgRequest, std::move(aggregationRequest), std::move(privileges)); } class Invocation final : public CommandInvocation { public: - Invocation(Command* cmd, const OpMsgRequest& request, PrivilegeVector privileges) + Invocation(Command* cmd, + const OpMsgRequest& request, + const AggregationRequest aggregationRequest, + PrivilegeVector privileges) : CommandInvocation(cmd), _request(request), _dbName(request.getDatabase().toString()), + _aggregationRequest(std::move(aggregationRequest)), + _liteParsedPipeline(_aggregationRequest), _privileges(std::move(privileges)) {} private: @@ -83,16 +103,10 @@ public: } ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const override { - // Aggregations that are run directly against a collection allow any read concern. - // Otherwise, if the aggregate is collectionless then the read concern must be 'local' - // (e.g. $currentOp). The exception to this is a $changeStream on a whole database, - // which is considered collectionless but must be read concern 'majority'. Further read - // concern validation is done one the pipeline is parsed. - return {level == repl::ReadConcernLevel::kLocalReadConcern || - level == repl::ReadConcernLevel::kMajorityReadConcern || - !AggregationRequest::parseNs(_dbName, _request.body) - .isCollectionlessAggregateNS(), - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return _liteParsedPipeline.supportsReadConcern( + level, + _aggregationRequest.getExplain(), + serverGlobalParams.enableMajorityReadConcern); } bool allowsSpeculativeMajorityReads() const override { @@ -106,29 +120,27 @@ public: CommandHelpers::handleMarkKillOnClientDisconnect( opCtx, !Pipeline::aggHasWriteStage(_request.body)); - const auto aggregationRequest = uassertStatusOK( - AggregationRequest::parseFromBSON(_dbName, _request.body, boost::none)); uassertStatusOK(runAggregate(opCtx, - aggregationRequest.getNamespaceString(), - aggregationRequest, + _aggregationRequest.getNamespaceString(), + _aggregationRequest, + _liteParsedPipeline, _request.body, _privileges, reply)); } NamespaceString ns() const override { - return AggregationRequest::parseNs(_dbName, _request.body); + return _aggregationRequest.getNamespaceString(); } void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) override { - const auto aggregationRequest = uassertStatusOK( - AggregationRequest::parseFromBSON(_dbName, _request.body, verbosity)); uassertStatusOK(runAggregate(opCtx, - aggregationRequest.getNamespaceString(), - aggregationRequest, + _aggregationRequest.getNamespaceString(), + _aggregationRequest, + _liteParsedPipeline, _request.body, _privileges, result)); @@ -143,6 +155,8 @@ public: const OpMsgRequest& _request; const std::string _dbName; + const AggregationRequest _aggregationRequest; + const LiteParsedPipeline _liteParsedPipeline; const PrivilegeVector _privileges; }; diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index b7b87dd6018..ef7394df62a 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -480,8 +480,18 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createOuterPipelineProxyExe } // namespace Status runAggregate(OperationContext* opCtx, + const NamespaceString& nss, + const AggregationRequest& request, + const BSONObj& cmdObj, + const PrivilegeVector& privileges, + rpc::ReplyBuilderInterface* result) { + return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result); +} + +Status runAggregate(OperationContext* opCtx, const NamespaceString& origNss, const AggregationRequest& request, + const LiteParsedPipeline& liteParsedPipeline, const BSONObj& cmdObj, const PrivilegeVector& privileges, rpc::ReplyBuilderInterface* result) { @@ -504,18 +514,12 @@ Status runAggregate(OperationContext* opCtx, boost::intrusive_ptr<ExpressionContext> expCtx; auto curOp = CurOp::get(opCtx); { - const LiteParsedPipeline liteParsedPipeline(request); - // If we are in a transaction, check whether the parsed pipeline supports // being in a transaction. if (opCtx->inMultiDocumentTransaction()) { liteParsedPipeline.assertSupportsMultiDocumentTransaction(request.getExplain()); } - // Check whether the parsed pipeline supports the given read concern. - liteParsedPipeline.assertSupportsReadConcern( - opCtx, request.getExplain(), serverGlobalParams.enableMajorityReadConcern); - const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces(); // If this is a collectionless aggregation, we won't create 'ctx' but will still need an diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h index 77d6bd7f9f9..059c48a599b 100644 --- a/src/mongo/db/commands/run_aggregate.h +++ b/src/mongo/db/commands/run_aggregate.h @@ -35,6 +35,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/rpc/op_msg_rpc_impls.h" namespace mongo { @@ -53,6 +54,17 @@ namespace mongo { Status runAggregate(OperationContext* opCtx, const NamespaceString& nss, const AggregationRequest& request, + const LiteParsedPipeline& liteParsedPipeline, + const BSONObj& cmdObj, + const PrivilegeVector& privileges, + rpc::ReplyBuilderInterface* result); + +/** + * Convenience version that internally constructs the LiteParsedPipeline. + */ +Status runAggregate(OperationContext* opCtx, + const NamespaceString& nss, + const AggregationRequest& request, const BSONObj& cmdObj, const PrivilegeVector& privileges, rpc::ReplyBuilderInterface* result); diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index d3492b0c319..8f8ef6f6e79 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -80,15 +80,13 @@ public: } } - void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const { - // Only "majority" is allowed for change streams. - uassert( - ErrorCodes::InvalidOptions, - str::stream() - << "$changeStream cannot run with a readConcern other than 'majority'. Current " - << "readConcern: " << readConcern.toString(), - !readConcern.hasLevel() || - readConcern.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern); + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const { + // Change streams require "majority" readConcern. If the client did not specify an + // explicit readConcern, change streams will internally upconvert the readConcern to + // majority (so clients can always send aggregations without readConcern). We therefore + // do not permit the cluster-wide default to be applied. + return onlySingleReadConcernSupported( + kStageName, repl::ReadConcernLevel::kMajorityReadConcern, level); } void assertSupportsMultiDocumentTransaction() const { diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index bc66e0fef64..f2e6470f370 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -79,8 +79,8 @@ public: return true; } - void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const { - onlyReadConcernLocalSupported(kStageName, readConcern); + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const { + return onlyReadConcernLocalSupported(kStageName, level); } void assertSupportsMultiDocumentTransaction() const { diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h index 9406bf2e126..fabc4e0cfd7 100644 --- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h +++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h @@ -69,8 +69,8 @@ public: return false; } - void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const { - onlyReadConcernLocalSupported(kStageName, readConcern); + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const { + return onlyReadConcernLocalSupported(kStageName, level); } void assertSupportsMultiDocumentTransaction() const { diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index b4439a4323d..0b1738b4f53 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -81,8 +81,8 @@ public: return false; } - void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const { - onlyReadConcernLocalSupported(DocumentSourceListLocalSessions::kStageName, readConcern); + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const { + return onlyReadConcernLocalSupported(kStageName, level); } void assertSupportsMultiDocumentTransaction() const { diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h index 5902896ee72..cdb582bed14 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h @@ -65,8 +65,8 @@ public: return false; } - void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const { - onlyReadConcernLocalSupported(DocumentSourcePlanCacheStats::kStageName, readConcern); + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const { + return onlyReadConcernLocalSupported(kStageName, level); } void assertSupportsMultiDocumentTransaction() const { diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index 064e22389f3..82d9ce1b7f7 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -37,6 +37,7 @@ #include "mongo/db/auth/privilege.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/db/read_concern_support_result.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/stdx/unordered_set.h" @@ -130,10 +131,11 @@ public: } /** - * Verifies that this stage is allowed to run with the specified read concern. Throws a - * UserException if not compatible. + * Verifies that this stage is allowed to run with the specified read concern level. */ - virtual void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {} + virtual ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const { + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); + } /** * Verifies that this stage is allowed to run in a multi-document transaction. Throws a @@ -150,14 +152,26 @@ protected: << "multi-document transaction."); } - void onlyReadConcernLocalSupported(StringData stageName, - const repl::ReadConcernArgs& readConcern) const { - uassert(ErrorCodes::InvalidOptions, - str::stream() - << "Aggregation stage " << stageName - << " cannot run with a readConcern other than 'local'. Current readConcern: " - << readConcern.toString(), - readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern); + ReadConcernSupportResult onlySingleReadConcernSupported( + StringData stageName, + repl::ReadConcernLevel supportedLevel, + repl::ReadConcernLevel candidateLevel) const { + return {{candidateLevel != supportedLevel, + {ErrorCodes::InvalidOptions, + str::stream() << "Aggregation stage " << stageName + << " cannot run with a readConcern other than '" + << repl::readConcernLevels::toString(supportedLevel) + << "'. Current readConcern: " + << repl::readConcernLevels::toString(candidateLevel)}}, + {{ErrorCodes::InvalidOptions, + str::stream() << "Aggregation stage " << stageName + << " does not permit default readConcern to be applied."}}}; + } + + ReadConcernSupportResult onlyReadConcernLocalSupported(StringData stageName, + repl::ReadConcernLevel level) const { + return onlySingleReadConcernSupported( + stageName, repl::ReadConcernLevel::kLocalReadConcern, level); } }; diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp index 6af52389e6a..954239fa928 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp @@ -35,30 +35,50 @@ namespace mongo { -void LiteParsedPipeline::assertSupportsReadConcern( - OperationContext* opCtx, +ReadConcernSupportResult LiteParsedPipeline::supportsReadConcern( + repl::ReadConcernLevel level, boost::optional<ExplainOptions::Verbosity> explain, bool enableMajorityReadConcern) const { - auto readConcern = repl::ReadConcernArgs::get(opCtx); + // Start by assuming that we will support both readConcern and cluster-wide default. + ReadConcernSupportResult result = ReadConcernSupportResult::allSupportedAndDefaultPermitted(); - // Reject non change stream aggregation queries that try to use "majority" read concern when - // enableMajorityReadConcern=false. + // 1. Determine whether the given read concern must be rejected for any pipeline-global reasons. if (!hasChangeStream() && !enableMajorityReadConcern && - (repl::ReadConcernArgs::get(opCtx).getLevel() == - repl::ReadConcernLevel::kMajorityReadConcern)) { - uasserted(ErrorCodes::ReadConcernMajorityNotEnabled, - "Only change stream aggregation queries support 'majority' read concern when " - "enableMajorityReadConcern=false"); + (level == repl::ReadConcernLevel::kMajorityReadConcern)) { + // Reject non change stream aggregation queries that try to use "majority" read concern when + // enableMajorityReadConcern=false. + result.readConcernSupport = { + ErrorCodes::ReadConcernMajorityNotEnabled, + "Only change stream aggregation queries support 'majority' read concern when " + "enableMajorityReadConcern=false"}; + } else if (explain && level != repl::ReadConcernLevel::kLocalReadConcern) { + // Reject non-local read concern when the pipeline is being explained. + result.readConcernSupport = { + ErrorCodes::InvalidOptions, + str::stream() << "Explain for the aggregate command cannot run with a readConcern " + << "other than 'local'. Current readConcern level: " + << repl::readConcernLevels::toString(level)}; } - uassert(ErrorCodes::InvalidOptions, - str::stream() << "Explain for the aggregate command cannot run with a readConcern " - << "other than 'local'. Current readConcern: " << readConcern.toString(), - !explain || readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern); + // 2. Determine whether the default read concern must be denied for any pipeline-global reasons. + if (explain) { + result.defaultReadConcernPermit = { + ErrorCodes::InvalidOptions, + "Explain for the aggregate command does not permit default readConcern to be " + "applied."}; + } + // 3. If either the specified or default readConcern have not already been rejected, determine + // whether the pipeline stages support them. If not, we record the first error we encounter. for (auto&& spec : _stageSpecs) { - spec->assertSupportsReadConcern(readConcern); + // If both result statuses are already not OK, stop checking further stages. + if (!result.readConcernSupport.isOK() && !result.defaultReadConcernPermit.isOK()) { + break; + } + result.merge(spec->supportsReadConcern(level)); } + + return result; } void LiteParsedPipeline::assertSupportsMultiDocumentTransaction( @@ -82,8 +102,6 @@ void LiteParsedPipeline::verifyIsSupported( if (opCtx->inMultiDocumentTransaction()) { assertSupportsMultiDocumentTransaction(explain); } - // Verify litePipe can be run at the given read concern. - assertSupportsReadConcern(opCtx, explain, enableMajorityReadConcern); // Verify that no involved namespace is sharded unless allowed by the pipeline. for (const auto& nss : getInvolvedNamespaces()) { uassert(28769, diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index 21e309dbaba..91404d25cbf 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -37,6 +37,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/read_concern_support_result.h" namespace mongo { @@ -122,12 +123,11 @@ public: } /** - * Verifies that this pipeline is allowed to run with the specified read concern. This ensures - * that each stage is compatible, and throws a UserException if not. + * Verifies that this pipeline is allowed to run with the specified read concern level. */ - void assertSupportsReadConcern(OperationContext* opCtx, - boost::optional<ExplainOptions::Verbosity> explain, - bool enableMajorityReadConcern) const; + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level, + boost::optional<ExplainOptions::Verbosity> explain, + bool enableMajorityReadConcern) const; /** * Verifies that this pipeline is allowed to run in a multi-document transaction. This ensures diff --git a/src/mongo/db/read_concern_support_result.h b/src/mongo/db/read_concern_support_result.h new file mode 100644 index 00000000000..9bca706a15b --- /dev/null +++ b/src/mongo/db/read_concern_support_result.h @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" + +namespace mongo { + +/** + * The result of checking a thing's readConcern support. There are two parts: + * - Whether or not the thing supports the given readConcern. + * - Whether or not the thing permits having the default readConcern applied. + * + * The thing is a command invocation, an aggregation pipeline, or an aggregation stage. + */ +struct ReadConcernSupportResult { + /** + * Convenience method to explicitly return a ReadConcernSupportResult which both supports the + * given read concern and permits the default cluster-wide read concern to be applied. + */ + static ReadConcernSupportResult allSupportedAndDefaultPermitted() { + return {Status::OK(), Status::OK()}; + } + + /** + * Whether this thing supports the requested readConcern level (and if not, why not). + */ + Status readConcernSupport; + + /** + * Whether this thing permits the default readConcern to be applied (and if not, why not). + */ + Status defaultReadConcernPermit; + + /** + * Construct with the given Statuses, or default to Status::OK if omitted. + */ + ReadConcernSupportResult(boost::optional<Status> readConcernStatus, + boost::optional<Status> defaultReadConcernStatus) + : readConcernSupport(readConcernStatus.value_or(Status::OK())), + defaultReadConcernPermit(defaultReadConcernStatus.value_or(Status::OK())) {} + + /** + * Combine the contents of another ReadConcernSupportResult with this one. The outcome is that, + * for each of supported and permitted, this ReadConcernSupportResult will be non-OK if either + * it or the other one is non-OK. If both are non-OK then preference is given to this one's + * Status. + */ + void merge(const ReadConcernSupportResult& other) { + if (readConcernSupport.isOK()) { + readConcernSupport = other.readConcernSupport; + } + if (defaultReadConcernPermit.isOK()) { + defaultReadConcernPermit = other.defaultReadConcernPermit; + } + } +}; + +} // namespace mongo diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 4c7f25518c3..11d5ee56538 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -239,8 +239,7 @@ StatusWith<repl::ReadConcernArgs> _extractReadConcern(OperationContext* opCtx, } auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel()); - if (readConcernSupport.defaultReadConcern == - ReadConcernSupportResult::DefaultReadConcern::kPermitted && + if (readConcernSupport.defaultReadConcernPermit.isOK() && !opCtx->getClient()->isInDirectClient()) { if (serverGlobalParams.clusterRole == ClusterRole::ShardServer || serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { @@ -270,33 +269,45 @@ StatusWith<repl::ReadConcernArgs> _extractReadConcern(OperationContext* opCtx, } } - auto readConcernLevel = readConcernArgs.getLevel(); // Update the readConcernSupport, in case the default RC was applied. - readConcernSupport = invocation->supportsReadConcern(readConcernLevel); - if (startTransaction && readConcernLevel != repl::ReadConcernLevel::kSnapshotReadConcern && - readConcernLevel != repl::ReadConcernLevel::kMajorityReadConcern && - readConcernLevel != repl::ReadConcernLevel::kLocalReadConcern) { - return Status( - ErrorCodes::InvalidOptions, - "The readConcern level must be either 'local' (default), 'majority' or 'snapshot' in " - "order to run in a transaction"); - } - - if (startTransaction && readConcernArgs.getArgsOpTime()) { - return Status(ErrorCodes::InvalidOptions, - str::stream() - << "The readConcern cannot specify '" - << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction"); + readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel()); + + // If we are starting a transaction, we only need to check whether the read concern is + // appropriate for running a transaction. There is no need to check whether the specific + // command supports the read concern, because all commands that are allowed to run in a + // transaction must support all applicable read concerns. + if (startTransaction) { + switch (readConcernArgs.getLevel()) { + case repl::ReadConcernLevel::kLocalReadConcern: + case repl::ReadConcernLevel::kMajorityReadConcern: + case repl::ReadConcernLevel::kSnapshotReadConcern: + // Acceptable readConcern for a transaction. + break; + default: + return {ErrorCodes::InvalidOptions, + "The readConcern level must be either 'local' (default), 'majority' or " + "'snapshot' in " + "order to run in a transaction"}; + } + if (readConcernArgs.getArgsOpTime()) { + return {ErrorCodes::InvalidOptions, + str::stream() << "The readConcern cannot specify '" + << repl::ReadConcernArgs::kAfterOpTimeFieldName + << "' in a transaction"}; + } } - // There is no need to check if the command supports the read concern while in a transaction - // because all commands that are allowed to run in a transaction must support all the read - // concerns that can be used with a transaction. - if (!startTransaction && - readConcernSupport.readConcern == ReadConcernSupportResult::ReadConcern::kNotSupported) { - return {ErrorCodes::InvalidOptions, - str::stream() << "Command does not support read concern " - << readConcernArgs.toString()}; + // Otherwise, if there is a read concern present - either user-specified or from the default - + // then check whether the command supports it. If there is no explicit read concern level, then + // it is implicitly "local". There is no need to check whether this is supported, because all + // commands either support "local" or upconvert the absent readConcern to a stronger level that + // they do support; for instance, $changeStream upconverts to RC level "majority". + if (!startTransaction && readConcernArgs.hasLevel()) { + if (!readConcernSupport.readConcernSupport.isOK()) { + return readConcernSupport.readConcernSupport.withContext( + str::stream() << "Command " << invocation->definition()->getName() + << " does not support " << readConcernArgs.toString()); + } } // If this command invocation asked for 'majority' read concern, supports blocking majority diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index 67e486f9b3a..a3dd0039dfb 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -65,6 +65,12 @@ public: return false; } + ReadConcernSupportResult supportsReadConcern(const std::string& dbName, + const BSONObj& cmdObj, + repl::ReadConcernLevel level) const override { + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); + } + void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) const override { diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp index 5dc8f632f60..772f4c8ea0f 100644 --- a/src/mongo/s/commands/cluster_current_op.cpp +++ b/src/mongo/s/commands/cluster_current_op.cpp @@ -80,6 +80,7 @@ private: opCtx, ClusterAggregate::Namespaces{nss, nss}, request, + {request}, {Privilege(ResourcePattern::forClusterResource(), ActionType::inprog)}, &responseBuilder); diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp index 0009fc6f89a..f0493f9fe98 100644 --- a/src/mongo/s/commands/cluster_distinct_cmd.cpp +++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp @@ -75,8 +75,7 @@ public: ReadConcernSupportResult supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj, repl::ReadConcernLevel level) const final { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } void addRequiredPrivileges(const std::string& dbname, diff --git a/src/mongo/s/commands/cluster_explain_cmd.cpp b/src/mongo/s/commands/cluster_explain_cmd.cpp index 3736947d113..3e2b3e16b38 100644 --- a/src/mongo/s/commands/cluster_explain_cmd.cpp +++ b/src/mongo/s/commands/cluster_explain_cmd.cpp @@ -189,7 +189,7 @@ std::unique_ptr<CommandInvocation> ClusterExplainCmd::parse(OperationContext* op str::stream() << "Explain failed due to unknown command: " << cmdName, explainedCommand); auto innerRequest = std::make_unique<OpMsgRequest>(OpMsg{explainedObj}); - auto innerInvocation = explainedCommand->parse(opCtx, *innerRequest); + auto innerInvocation = explainedCommand->parseForExplain(opCtx, *innerRequest, verbosity); return std::make_unique<Invocation>( this, request, std::move(verbosity), std::move(innerRequest), std::move(innerInvocation)); } diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 7169f2be79a..84133e9c5b8 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -156,8 +156,7 @@ public: ReadConcernSupportResult supportsReadConcern(const std::string& dbName, const BSONObj& cmdObj, repl::ReadConcernLevel level) const final { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } void addRequiredPrivileges(const std::string& dbname, diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index 36d15ca6ce8..d6a14cd7428 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -113,8 +113,7 @@ public: } ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const final { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } NamespaceString ns() const override { diff --git a/src/mongo/s/commands/cluster_killcursors_cmd.cpp b/src/mongo/s/commands/cluster_killcursors_cmd.cpp index a90a24d3f53..2408f00fb0b 100644 --- a/src/mongo/s/commands/cluster_killcursors_cmd.cpp +++ b/src/mongo/s/commands/cluster_killcursors_cmd.cpp @@ -46,8 +46,7 @@ public: const BSONObj& cmdObj, repl::ReadConcernLevel level) const final { // killCursors must support read concerns in order to be run in transactions. - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } bool run(OperationContext* opCtx, diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 02a70417d72..3ca37e19909 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -34,6 +34,7 @@ #include "mongo/base/status.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/views/resolved_view.h" #include "mongo/s/query/cluster_aggregate.h" @@ -44,25 +45,52 @@ class ClusterPipelineCommand final : public Command { public: ClusterPipelineCommand() : Command("aggregate") {} + /** + * It's not known until after parsing whether or not an aggregation command is an explain + * request, because it might include the `explain: true` field (ie. aggregation explains do not + * need to arrive via the `explain` command). Therefore even parsing of regular aggregation + * commands needs to be able to handle the explain case. + * + * As a result, aggregation command parsing is done in parseForExplain(): + * + * - To parse a regular aggregation command, call parseForExplain() with `explainVerbosity` of + * boost::none. + * + * - To parse an aggregation command as the sub-command in an `explain` command, call + * parseForExplain() with `explainVerbosity` set to the desired verbosity. + */ std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, const OpMsgRequest& opMsgRequest) override { - auto privileges = - uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) - ->getPrivilegesForAggregate( - AggregationRequest::parseNs( - opMsgRequest.getDatabase().toString(), opMsgRequest.body), - opMsgRequest.body, - true)); - // TODO: Parsing to a Pipeline and/or AggregationRequest here. - return std::make_unique<Invocation>(this, opMsgRequest, std::move(privileges)); + return parseForExplain(opCtx, opMsgRequest, boost::none); + } + + std::unique_ptr<CommandInvocation> parseForExplain( + OperationContext* opCtx, + const OpMsgRequest& opMsgRequest, + boost::optional<ExplainOptions::Verbosity> explainVerbosity) override { + const auto aggregationRequest = uassertStatusOK(AggregationRequest::parseFromBSON( + opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity)); + + auto privileges = uassertStatusOK( + AuthorizationSession::get(opCtx->getClient()) + ->getPrivilegesForAggregate( + aggregationRequest.getNamespaceString(), opMsgRequest.body, true)); + + return std::make_unique<Invocation>( + this, opMsgRequest, std::move(aggregationRequest), std::move(privileges)); } class Invocation final : public CommandInvocation { public: - Invocation(Command* cmd, const OpMsgRequest& request, PrivilegeVector privileges) + Invocation(Command* cmd, + const OpMsgRequest& request, + const AggregationRequest aggregationRequest, + PrivilegeVector privileges) : CommandInvocation(cmd), _request(request), _dbName(request.getDatabase().toString()), + _aggregationRequest(std::move(aggregationRequest)), + _liteParsedPipeline(LiteParsedPipeline(_aggregationRequest)), _privileges(std::move(privileges)) {} private: @@ -71,31 +99,31 @@ public: } ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const override { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return _liteParsedPipeline.supportsReadConcern( + level, + _aggregationRequest.getExplain(), + serverGlobalParams.enableMajorityReadConcern); } void _runAggCommand(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, - boost::optional<ExplainOptions::Verbosity> verbosity, BSONObjBuilder* result) { - const auto aggregationRequest = - uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity)); - const auto& nss = aggregationRequest.getNamespaceString(); + const auto& nss = _aggregationRequest.getNamespaceString(); try { uassertStatusOK( ClusterAggregate::runAggregate(opCtx, ClusterAggregate::Namespaces{nss, nss}, - aggregationRequest, + _aggregationRequest, + _liteParsedPipeline, _privileges, result)); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { // If the aggregation failed because the namespace is a view, re-run the command // with the resolved view pipeline and namespace. uassertStatusOK(ClusterAggregate::retryOnViewError(opCtx, - aggregationRequest, + _aggregationRequest, *ex.extraInfo<ResolvedView>(), nss, _privileges, @@ -108,14 +136,14 @@ public: opCtx, !Pipeline::aggHasWriteStage(_request.body)); auto bob = reply->getBodyBuilder(); - _runAggCommand(opCtx, _dbName, _request.body, boost::none, &bob); + _runAggCommand(opCtx, _dbName, _request.body, &bob); } void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) override { auto bodyBuilder = result->getBodyBuilder(); - _runAggCommand(opCtx, _dbName, _request.body, verbosity, &bodyBuilder); + _runAggCommand(opCtx, _dbName, _request.body, &bodyBuilder); } void doCheckAuthorization(OperationContext* opCtx) const override { @@ -126,11 +154,13 @@ public: } NamespaceString ns() const override { - return AggregationRequest::parseNs(_dbName, _request.body); + return _aggregationRequest.getNamespaceString(); } const OpMsgRequest& _request; const std::string _dbName; + const AggregationRequest _aggregationRequest; + const LiteParsedPipeline _liteParsedPipeline; const PrivilegeVector _privileges; }; diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 5732d1243d3..c018f4eaf56 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -549,8 +549,7 @@ private: } ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const final { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } void doCheckAuthorization(OperationContext* opCtx) const final { diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 2f734c11654..b0a591cadc5 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -394,9 +394,12 @@ void runCommand(OperationContext* opCtx, !readConcernArgs.getArgsAtClusterTime()); } + auto readConcernSupport = invocation->supportsReadConcern(readConcernArgs.getLevel()); + boost::optional<RouterOperationContextSession> routerSession; try { CommandHelpers::evaluateFailCommandFailPoint(opCtx, commandName, invocation->ns()); + bool startTransaction = false; if (osi.getAutocommit()) { routerSession.emplace(opCtx); @@ -419,6 +422,7 @@ void runCommand(OperationContext* opCtx, return TransactionRouter::TransactionActions::kContinue; })(); + startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart); txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction); } @@ -455,6 +459,55 @@ void runCommand(OperationContext* opCtx, opCtx->setWriteConcern(wc); } + // If we are starting a transaction, we only need to check whether the read concern is + // appropriate for running a transaction. There is no need to check whether the specific + // command supports the read concern, because all commands that are allowed to run in a + // transaction must support all applicable read concerns. + if (startTransaction) { + switch (readConcernArgs.getLevel()) { + case repl::ReadConcernLevel::kLocalReadConcern: + case repl::ReadConcernLevel::kMajorityReadConcern: + case repl::ReadConcernLevel::kSnapshotReadConcern: + // Acceptable readConcern for a transaction. + break; + default: + auto responseBuilder = replyBuilder->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow( + responseBuilder, + {ErrorCodes::InvalidOptions, + "The readConcern level must be either 'local' (default), 'majority' or " + "'snapshot' in order to run in a transaction"}); + return; + } + if (readConcernArgs.getArgsOpTime()) { + auto responseBuilder = replyBuilder->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow( + responseBuilder, + {ErrorCodes::InvalidOptions, + str::stream() + << "The readConcern cannot specify '" + << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction"}); + return; + } + } + + // Otherwise, if there is a read concern present - either user-specified or the default - + // then check whether the command supports it. If there is no explicit read concern level, + // then it is implicitly "local". There is no need to check whether this is supported, + // because all commands either support "local" or upconvert the absent readConcern to a + // stronger level that they do support; e.g. $changeStream upconverts to RC "majority". + if (!startTransaction && readConcernArgs.hasLevel()) { + if (!readConcernSupport.readConcernSupport.isOK()) { + auto responseBuilder = replyBuilder->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow( + responseBuilder, + readConcernSupport.readConcernSupport.withContext( + str::stream() << "Command " << invocation->definition()->getName() + << " does not support " << readConcernArgs.toString())); + return; + } + } + for (int tries = 0;; ++tries) { // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown. bool canRetry = tries < kMaxNumStaleVersionRetries - 1; diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 7ee03280a30..ad307c3bfe5 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -145,6 +145,15 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, const AggregationRequest& request, const PrivilegeVector& privileges, BSONObjBuilder* result) { + return runAggregate(opCtx, namespaces, request, {request}, privileges, result); +} + +Status ClusterAggregate::runAggregate(OperationContext* opCtx, + const Namespaces& namespaces, + const AggregationRequest& request, + const LiteParsedPipeline& liteParsedPipeline, + const PrivilegeVector& privileges, + BSONObjBuilder* result) { uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec()); uassert(51143, "Cannot specify runtime constants option to a mongos", @@ -161,11 +170,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return resolvedNsRoutingInfo.cm().get(); }; - LiteParsedPipeline litePipe(request); - litePipe.verifyIsSupported( + liteParsedPipeline.verifyIsSupported( opCtx, isSharded, request.getExplain(), serverGlobalParams.enableMajorityReadConcern); - auto hasChangeStream = litePipe.hasChangeStream(); - auto involvedNamespaces = litePipe.getInvolvedNamespaces(); + auto hasChangeStream = liteParsedPipeline.hasChangeStream(); + auto involvedNamespaces = liteParsedPipeline.getInvolvedNamespaces(); // If the routing table is valid, we obtain a reference to it. If the table is not valid, then // either the database does not exist, or there are no shards in the cluster. In the latter @@ -214,14 +222,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return pipeline; }; - auto targeter = - sharded_agg_helpers::AggregationTargeter::make(opCtx, - namespaces.executionNss, - pipelineBuilder, - routingInfo, - involvedNamespaces, - hasChangeStream, - litePipe.allowedToPassthroughFromMongos()); + auto targeter = sharded_agg_helpers::AggregationTargeter::make( + opCtx, + namespaces.executionNss, + pipelineBuilder, + routingInfo, + involvedNamespaces, + hasChangeStream, + liteParsedPipeline.allowedToPassthroughFromMongos()); if (!expCtx) { // When the AggregationTargeter chooses a "passthrough" policy, it does not call the @@ -308,8 +316,8 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx, nsStruct.requestedNss = requestedNss; nsStruct.executionNss = resolvedView.getNamespace(); - auto status = - ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, privileges, result); + auto status = ClusterAggregate::runAggregate( + opCtx, nsStruct, resolvedAggRequest, {resolvedAggRequest}, privileges, result); // If the underlying namespace was changed to a view during retry, then re-run the aggregation // on the new resolved namespace. diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h index 9d3c0a90eba..4fb474d2f8f 100644 --- a/src/mongo/s/query/cluster_aggregate.h +++ b/src/mongo/s/query/cluster_aggregate.h @@ -37,6 +37,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/s/async_requests_sender.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/commands/strategy.h" @@ -45,7 +46,6 @@ namespace mongo { -class LiteParsedPipeline; class OperationContext; class ShardId; @@ -85,6 +85,16 @@ public: static Status runAggregate(OperationContext* opCtx, const Namespaces& namespaces, const AggregationRequest& request, + const LiteParsedPipeline& liteParsedPipeline, + const PrivilegeVector& privileges, + BSONObjBuilder* result); + + /** + * Convenience version that internally constructs the LiteParsedPipeline. + */ + static Status runAggregate(OperationContext* opCtx, + const Namespaces& namespaces, + const AggregationRequest& request, const PrivilegeVector& privileges, BSONObjBuilder* result); diff --git a/src/mongo/s/query/cluster_aggregate_test.cpp b/src/mongo/s/query/cluster_aggregate_test.cpp index 0b27b29042f..d48ba9f3d41 100644 --- a/src/mongo/s/query/cluster_aggregate_test.cpp +++ b/src/mongo/s/query/cluster_aggregate_test.cpp @@ -103,6 +103,7 @@ protected: return ClusterAggregate::runAggregate(opCtx.get(), ClusterAggregate::Namespaces{nss, nss}, request.getValue(), + {request.getValue()}, PrivilegeVector(), &result); } |