diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_pipeline_cmd.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_pipeline_cmd.cpp | 72 |
1 files changed, 51 insertions, 21 deletions
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 02a70417d72..3ca37e19909 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -34,6 +34,7 @@ #include "mongo/base/status.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/views/resolved_view.h" #include "mongo/s/query/cluster_aggregate.h" @@ -44,25 +45,52 @@ class ClusterPipelineCommand final : public Command { public: ClusterPipelineCommand() : Command("aggregate") {} + /** + * It's not known until after parsing whether or not an aggregation command is an explain + * request, because it might include the `explain: true` field (ie. aggregation explains do not + * need to arrive via the `explain` command). Therefore even parsing of regular aggregation + * commands needs to be able to handle the explain case. + * + * As a result, aggregation command parsing is done in parseForExplain(): + * + * - To parse a regular aggregation command, call parseForExplain() with `explainVerbosity` of + * boost::none. + * + * - To parse an aggregation command as the sub-command in an `explain` command, call + * parseForExplain() with `explainVerbosity` set to the desired verbosity. + */ std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, const OpMsgRequest& opMsgRequest) override { - auto privileges = - uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) - ->getPrivilegesForAggregate( - AggregationRequest::parseNs( - opMsgRequest.getDatabase().toString(), opMsgRequest.body), - opMsgRequest.body, - true)); - // TODO: Parsing to a Pipeline and/or AggregationRequest here. - return std::make_unique<Invocation>(this, opMsgRequest, std::move(privileges)); + return parseForExplain(opCtx, opMsgRequest, boost::none); + } + + std::unique_ptr<CommandInvocation> parseForExplain( + OperationContext* opCtx, + const OpMsgRequest& opMsgRequest, + boost::optional<ExplainOptions::Verbosity> explainVerbosity) override { + const auto aggregationRequest = uassertStatusOK(AggregationRequest::parseFromBSON( + opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity)); + + auto privileges = uassertStatusOK( + AuthorizationSession::get(opCtx->getClient()) + ->getPrivilegesForAggregate( + aggregationRequest.getNamespaceString(), opMsgRequest.body, true)); + + return std::make_unique<Invocation>( + this, opMsgRequest, std::move(aggregationRequest), std::move(privileges)); } class Invocation final : public CommandInvocation { public: - Invocation(Command* cmd, const OpMsgRequest& request, PrivilegeVector privileges) + Invocation(Command* cmd, + const OpMsgRequest& request, + const AggregationRequest aggregationRequest, + PrivilegeVector privileges) : CommandInvocation(cmd), _request(request), _dbName(request.getDatabase().toString()), + _aggregationRequest(std::move(aggregationRequest)), + _liteParsedPipeline(LiteParsedPipeline(_aggregationRequest)), _privileges(std::move(privileges)) {} private: @@ -71,31 +99,31 @@ public: } ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const override { - return {ReadConcernSupportResult::ReadConcern::kSupported, - ReadConcernSupportResult::DefaultReadConcern::kPermitted}; + return _liteParsedPipeline.supportsReadConcern( + level, + _aggregationRequest.getExplain(), + serverGlobalParams.enableMajorityReadConcern); } void _runAggCommand(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, - boost::optional<ExplainOptions::Verbosity> verbosity, BSONObjBuilder* result) { - const auto aggregationRequest = - uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity)); - const auto& nss = aggregationRequest.getNamespaceString(); + const auto& nss = _aggregationRequest.getNamespaceString(); try { uassertStatusOK( ClusterAggregate::runAggregate(opCtx, ClusterAggregate::Namespaces{nss, nss}, - aggregationRequest, + _aggregationRequest, + _liteParsedPipeline, _privileges, result)); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { // If the aggregation failed because the namespace is a view, re-run the command // with the resolved view pipeline and namespace. uassertStatusOK(ClusterAggregate::retryOnViewError(opCtx, - aggregationRequest, + _aggregationRequest, *ex.extraInfo<ResolvedView>(), nss, _privileges, @@ -108,14 +136,14 @@ public: opCtx, !Pipeline::aggHasWriteStage(_request.body)); auto bob = reply->getBodyBuilder(); - _runAggCommand(opCtx, _dbName, _request.body, boost::none, &bob); + _runAggCommand(opCtx, _dbName, _request.body, &bob); } void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) override { auto bodyBuilder = result->getBodyBuilder(); - _runAggCommand(opCtx, _dbName, _request.body, verbosity, &bodyBuilder); + _runAggCommand(opCtx, _dbName, _request.body, &bodyBuilder); } void doCheckAuthorization(OperationContext* opCtx) const override { @@ -126,11 +154,13 @@ public: } NamespaceString ns() const override { - return AggregationRequest::parseNs(_dbName, _request.body); + return _aggregationRequest.getNamespaceString(); } const OpMsgRequest& _request; const std::string _dbName; + const AggregationRequest _aggregationRequest; + const LiteParsedPipeline _liteParsedPipeline; const PrivilegeVector _privileges; }; |