diff options
author | Anthony Roy <anthony.roy@10gen.com> | 2018-07-10 14:16:42 -0400 |
---|---|---|
committer | Anthony Roy <anthony.roy@10gen.com> | 2018-07-11 13:36:19 -0400 |
commit | c27e72a4979b1b2ef241d23d2ad1434cdb3ff747 (patch) | |
tree | 670809642e1108c460fcb6cbfc12025da0f8b739 /src/mongo/db/commands/pipeline_command.cpp | |
parent | f743d10196903d7f24f8cf4bb5ca6b861627e5b5 (diff) | |
download | mongo-c27e72a4979b1b2ef241d23d2ad1434cdb3ff747.tar.gz |
SERVER-35912 Upgraded PipelineCommand and ClusterPipelineCommand to TypedCommand
Moved the pipeline commands from BasicCommand to Command in order to
gain access to DocumentSequences.
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 122 |
1 files changed, 71 insertions, 51 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 24142d7da5d..a7096773104 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -44,72 +44,92 @@ bool isMergePipeline(const std::vector<BSONObj>& pipeline) { return pipeline[0].hasField("$mergeCursors"); } -class PipelineCommand : public BasicCommand { +class PipelineCommand final : public Command { public: - PipelineCommand() : BasicCommand("aggregate") {} + PipelineCommand() : Command("aggregate") {} + + std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, + const OpMsgRequest& opMsgRequest) override { + // TODO: Parsing to a Pipeline and/or AggregationRequest here. + return std::make_unique<Invocation>(this, opMsgRequest); + } + + class Invocation final : public CommandInvocation { + public: + Invocation(Command* cmd, const OpMsgRequest& request) + : CommandInvocation(cmd), + _request(request), + _dbName(request.getDatabase().toString()) {} + + private: + bool supportsWriteConcern() const override { + return Pipeline::aggSupportsWriteConcern(this->_request.body); + } + + bool supportsReadConcern(repl::ReadConcernLevel level) const override { + // Aggregations that are run directly against a collection allow any read concern. + // Otherwise, if the aggregate is collectionless then the read concern must be 'local' + // (e.g. $currentOp). The exception to this is a $changeStream on a whole database, + // which is + // considered collectionless but must be read concern 'majority'. Further read concern + // validation is done one the pipeline is parsed. + return level == repl::ReadConcernLevel::kLocalReadConcern || + level == repl::ReadConcernLevel::kMajorityReadConcern || + !AggregationRequest::parseNs(_dbName, _request.body).isCollectionlessAggregateNS(); + } + + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* reply) override { + auto bob = reply->getBodyBuilder(); + const auto aggregationRequest = uassertStatusOK( + AggregationRequest::parseFromBSON(_dbName, _request.body, boost::none)); + + uassertStatusOK(runAggregate(opCtx, + aggregationRequest.getNamespaceString(), + aggregationRequest, + _request.body, + bob)); + } + + NamespaceString ns() const override { + return AggregationRequest::parseNs(_dbName, _request.body); + } + + void explain(OperationContext* opCtx, + ExplainOptions::Verbosity verbosity, + BSONObjBuilder* out) override { + const auto aggregationRequest = uassertStatusOK( + AggregationRequest::parseFromBSON(_dbName, _request.body, verbosity)); + + uassertStatusOK(runAggregate(opCtx, + aggregationRequest.getNamespaceString(), + aggregationRequest, + _request.body, + *out)); + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + const auto nss = ns(); + uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForAggregate(nss, _request.body, false)); + } + + const OpMsgRequest& _request; + const std::string _dbName; + }; std::string help() const override { return "Runs the aggregation command. See http://dochub.mongodb.org/core/aggregation for " "more details."; } - bool supportsWriteConcern(const BSONObj& cmd) const override { - return Pipeline::aggSupportsWriteConcern(cmd); - } - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kOptIn; } - bool supportsReadConcern(const std::string& dbName, - const BSONObj& cmdObj, - repl::ReadConcernLevel level) const override { - // Aggregations that are run directly against a collection allow any read concern. - // Otherwise, if the aggregate is collectionless then the read concern must be 'local' (e.g. - // $currentOp). The exception to this is a $changeStream on a whole database, which is - // considered collectionless but must be read concern 'majority'. Further read concern - // validation is done one the pipeline is parsed. - return level == repl::ReadConcernLevel::kLocalReadConcern || - level == repl::ReadConcernLevel::kMajorityReadConcern || - !AggregationRequest::parseNs(dbName, cmdObj).isCollectionlessAggregateNS(); - } - ReadWriteType getReadWriteType() const { return ReadWriteType::kRead; } - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) const override { - const NamespaceString nss(AggregationRequest::parseNs(dbname, cmdObj)); - return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj, false); - } - - bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { - const auto aggregationRequest = - uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, boost::none)); - - uassertStatusOK(runAggregate( - opCtx, aggregationRequest.getNamespaceString(), aggregationRequest, cmdObj, result)); - return true; - } - - Status explain(OperationContext* opCtx, - const OpMsgRequest& request, - ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { - std::string dbname = request.getDatabase().toString(); - const BSONObj& cmdObj = request.body; - const auto aggregationRequest = - uassertStatusOK(AggregationRequest::parseFromBSON(dbname, cmdObj, verbosity)); - - return runAggregate( - opCtx, aggregationRequest.getNamespaceString(), aggregationRequest, cmdObj, *out); - } - } pipelineCmd; } // namespace |