summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_pipeline_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_pipeline_cmd.cpp')
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp72
1 files changed, 51 insertions, 21 deletions
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 02a70417d72..3ca37e19909 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -34,6 +34,7 @@
#include "mongo/base/status.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/s/query/cluster_aggregate.h"
@@ -44,25 +45,52 @@ class ClusterPipelineCommand final : public Command {
public:
ClusterPipelineCommand() : Command("aggregate") {}
+ /**
+ * It's not known until after parsing whether or not an aggregation command is an explain
+ * request, because it might include the `explain: true` field (ie. aggregation explains do not
+ * need to arrive via the `explain` command). Therefore even parsing of regular aggregation
+ * commands needs to be able to handle the explain case.
+ *
+ * As a result, aggregation command parsing is done in parseForExplain():
+ *
+ * - To parse a regular aggregation command, call parseForExplain() with `explainVerbosity` of
+ * boost::none.
+ *
+ * - To parse an aggregation command as the sub-command in an `explain` command, call
+ * parseForExplain() with `explainVerbosity` set to the desired verbosity.
+ */
std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
const OpMsgRequest& opMsgRequest) override {
- auto privileges =
- uassertStatusOK(AuthorizationSession::get(opCtx->getClient())
- ->getPrivilegesForAggregate(
- AggregationRequest::parseNs(
- opMsgRequest.getDatabase().toString(), opMsgRequest.body),
- opMsgRequest.body,
- true));
- // TODO: Parsing to a Pipeline and/or AggregationRequest here.
- return std::make_unique<Invocation>(this, opMsgRequest, std::move(privileges));
+ return parseForExplain(opCtx, opMsgRequest, boost::none);
+ }
+
+ std::unique_ptr<CommandInvocation> parseForExplain(
+ OperationContext* opCtx,
+ const OpMsgRequest& opMsgRequest,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) override {
+ const auto aggregationRequest = uassertStatusOK(AggregationRequest::parseFromBSON(
+ opMsgRequest.getDatabase().toString(), opMsgRequest.body, explainVerbosity));
+
+ auto privileges = uassertStatusOK(
+ AuthorizationSession::get(opCtx->getClient())
+ ->getPrivilegesForAggregate(
+ aggregationRequest.getNamespaceString(), opMsgRequest.body, true));
+
+ return std::make_unique<Invocation>(
+ this, opMsgRequest, std::move(aggregationRequest), std::move(privileges));
}
class Invocation final : public CommandInvocation {
public:
- Invocation(Command* cmd, const OpMsgRequest& request, PrivilegeVector privileges)
+ Invocation(Command* cmd,
+ const OpMsgRequest& request,
+ const AggregationRequest aggregationRequest,
+ PrivilegeVector privileges)
: CommandInvocation(cmd),
_request(request),
_dbName(request.getDatabase().toString()),
+ _aggregationRequest(std::move(aggregationRequest)),
+ _liteParsedPipeline(LiteParsedPipeline(_aggregationRequest)),
_privileges(std::move(privileges)) {}
private:
@@ -71,31 +99,31 @@ public:
}
ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const override {
- return {ReadConcernSupportResult::ReadConcern::kSupported,
- ReadConcernSupportResult::DefaultReadConcern::kPermitted};
+ return _liteParsedPipeline.supportsReadConcern(
+ level,
+ _aggregationRequest.getExplain(),
+ serverGlobalParams.enableMajorityReadConcern);
}
void _runAggCommand(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
- boost::optional<ExplainOptions::Verbosity> verbosity,
BSONObjBuilder* result) {
- const auto aggregationRequest =
- uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity));
- const auto& nss = aggregationRequest.getNamespaceString();
+ const auto& nss = _aggregationRequest.getNamespaceString();
try {
uassertStatusOK(
ClusterAggregate::runAggregate(opCtx,
ClusterAggregate::Namespaces{nss, nss},
- aggregationRequest,
+ _aggregationRequest,
+ _liteParsedPipeline,
_privileges,
result));
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
// If the aggregation failed because the namespace is a view, re-run the command
// with the resolved view pipeline and namespace.
uassertStatusOK(ClusterAggregate::retryOnViewError(opCtx,
- aggregationRequest,
+ _aggregationRequest,
*ex.extraInfo<ResolvedView>(),
nss,
_privileges,
@@ -108,14 +136,14 @@ public:
opCtx, !Pipeline::aggHasWriteStage(_request.body));
auto bob = reply->getBodyBuilder();
- _runAggCommand(opCtx, _dbName, _request.body, boost::none, &bob);
+ _runAggCommand(opCtx, _dbName, _request.body, &bob);
}
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) override {
auto bodyBuilder = result->getBodyBuilder();
- _runAggCommand(opCtx, _dbName, _request.body, verbosity, &bodyBuilder);
+ _runAggCommand(opCtx, _dbName, _request.body, &bodyBuilder);
}
void doCheckAuthorization(OperationContext* opCtx) const override {
@@ -126,11 +154,13 @@ public:
}
NamespaceString ns() const override {
- return AggregationRequest::parseNs(_dbName, _request.body);
+ return _aggregationRequest.getNamespaceString();
}
const OpMsgRequest& _request;
const std::string _dbName;
+ const AggregationRequest _aggregationRequest;
+ const LiteParsedPipeline _liteParsedPipeline;
const PrivilegeVector _privileges;
};