summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/pipeline_command.cpp
diff options
context:
space:
mode:
authorAnthony Roy <anthony.roy@10gen.com>2018-07-10 14:16:42 -0400
committerAnthony Roy <anthony.roy@10gen.com>2018-07-11 13:36:19 -0400
commitc27e72a4979b1b2ef241d23d2ad1434cdb3ff747 (patch)
tree670809642e1108c460fcb6cbfc12025da0f8b739 /src/mongo/db/commands/pipeline_command.cpp
parentf743d10196903d7f24f8cf4bb5ca6b861627e5b5 (diff)
downloadmongo-c27e72a4979b1b2ef241d23d2ad1434cdb3ff747.tar.gz
SERVER-35912 Upgraded PipelineCommand and ClusterPipelineCommand to TypedCommand
Moved the pipeline commands from BasicCommand to Command in order to gain access to DocumentSequences.
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp122
1 files changed, 71 insertions, 51 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 24142d7da5d..a7096773104 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -44,72 +44,92 @@ bool isMergePipeline(const std::vector<BSONObj>& pipeline) {
return pipeline[0].hasField("$mergeCursors");
}
-class PipelineCommand : public BasicCommand {
+class PipelineCommand final : public Command {
public:
- PipelineCommand() : BasicCommand("aggregate") {}
+ PipelineCommand() : Command("aggregate") {}
+
+ std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
+ const OpMsgRequest& opMsgRequest) override {
+ // TODO: Parsing to a Pipeline and/or AggregationRequest here.
+ return std::make_unique<Invocation>(this, opMsgRequest);
+ }
+
+ class Invocation final : public CommandInvocation {
+ public:
+ Invocation(Command* cmd, const OpMsgRequest& request)
+ : CommandInvocation(cmd),
+ _request(request),
+ _dbName(request.getDatabase().toString()) {}
+
+ private:
+ bool supportsWriteConcern() const override {
+ return Pipeline::aggSupportsWriteConcern(this->_request.body);
+ }
+
+ bool supportsReadConcern(repl::ReadConcernLevel level) const override {
+ // Aggregations that are run directly against a collection allow any read concern.
+ // Otherwise, if the aggregate is collectionless then the read concern must be 'local'
+ // (e.g. $currentOp). The exception to this is a $changeStream on a whole database,
+ // which is
+ // considered collectionless but must be read concern 'majority'. Further read concern
+ // validation is done one the pipeline is parsed.
+ return level == repl::ReadConcernLevel::kLocalReadConcern ||
+ level == repl::ReadConcernLevel::kMajorityReadConcern ||
+ !AggregationRequest::parseNs(_dbName, _request.body).isCollectionlessAggregateNS();
+ }
+
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override {
+ auto bob = reply->getBodyBuilder();
+ const auto aggregationRequest = uassertStatusOK(
+ AggregationRequest::parseFromBSON(_dbName, _request.body, boost::none));
+
+ uassertStatusOK(runAggregate(opCtx,
+ aggregationRequest.getNamespaceString(),
+ aggregationRequest,
+ _request.body,
+ bob));
+ }
+
+ NamespaceString ns() const override {
+ return AggregationRequest::parseNs(_dbName, _request.body);
+ }
+
+ void explain(OperationContext* opCtx,
+ ExplainOptions::Verbosity verbosity,
+ BSONObjBuilder* out) override {
+ const auto aggregationRequest = uassertStatusOK(
+ AggregationRequest::parseFromBSON(_dbName, _request.body, verbosity));
+
+ uassertStatusOK(runAggregate(opCtx,
+ aggregationRequest.getNamespaceString(),
+ aggregationRequest,
+ _request.body,
+ *out));
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ const auto nss = ns();
+ uassertStatusOK(AuthorizationSession::get(opCtx->getClient())
+ ->checkAuthForAggregate(nss, _request.body, false));
+ }
+
+ const OpMsgRequest& _request;
+ const std::string _dbName;
+ };
std::string help() const override {
return "Runs the aggregation command. See http://dochub.mongodb.org/core/aggregation for "
"more details.";
}
- bool supportsWriteConcern(const BSONObj& cmd) const override {
- return Pipeline::aggSupportsWriteConcern(cmd);
- }
-
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kOptIn;
}
- bool supportsReadConcern(const std::string& dbName,
- const BSONObj& cmdObj,
- repl::ReadConcernLevel level) const override {
- // Aggregations that are run directly against a collection allow any read concern.
- // Otherwise, if the aggregate is collectionless then the read concern must be 'local' (e.g.
- // $currentOp). The exception to this is a $changeStream on a whole database, which is
- // considered collectionless but must be read concern 'majority'. Further read concern
- // validation is done one the pipeline is parsed.
- return level == repl::ReadConcernLevel::kLocalReadConcern ||
- level == repl::ReadConcernLevel::kMajorityReadConcern ||
- !AggregationRequest::parseNs(dbName, cmdObj).isCollectionlessAggregateNS();
- }
-
ReadWriteType getReadWriteType() const {
return ReadWriteType::kRead;
}
- Status checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) const override {
- const NamespaceString nss(AggregationRequest::parseNs(dbname, cmdObj));
- return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj, false);
- }
-
- bool run(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
- const auto aggregationRequest =
- uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, boost::none));
-
- uassertStatusOK(runAggregate(
- opCtx, aggregationRequest.getNamespaceString(), aggregationRequest, cmdObj, result));
- return true;
- }
-
- Status explain(OperationContext* opCtx,
- const OpMsgRequest& request,
- ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const override {
- std::string dbname = request.getDatabase().toString();
- const BSONObj& cmdObj = request.body;
- const auto aggregationRequest =
- uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity));
-
- return runAggregate(
- opCtx, aggregationRequest.getNamespaceString(), aggregationRequest, cmdObj, *out);
- }
-
} pipelineCmd;
} // namespace