diff options
24 files changed, 323 insertions, 49 deletions
diff --git a/jstests/noPassthrough/server_status_aggregation_stage_counter.js b/jstests/noPassthrough/server_status_aggregation_stage_counter.js new file mode 100644 index 00000000000..95069f30c0f --- /dev/null +++ b/jstests/noPassthrough/server_status_aggregation_stage_counter.js @@ -0,0 +1,144 @@ +/** + * Tests for serverStatus metrics.stage stats. + * @tags: [requires_sharding] + */ +(function() { + "use strict"; + + // In memory map of stage names to their counters. Used to verify that serverStatus is + // incrementing the appropriate stages correctly across multiple pipelines. + let countersWeExpectToIncreaseMap = {}; + + function checkCounters( + command, countersWeExpectToIncrease, countersWeExpectNotToIncrease = []) { + // Capture the pre-aggregation counts of the stages which we expect not to increase. + let metrics = db.serverStatus().metrics.aggStageCounters; + let noIncreaseCounterMap = {}; + for (let stage of countersWeExpectNotToIncrease) { + noIncreaseCounterMap[stage] = metrics[stage]; + } + + // Update in memory map to reflect what each counter's count should be after running + // 'command'. + for (let stage of countersWeExpectToIncrease) { + if (!countersWeExpectToIncreaseMap[stage]) { + countersWeExpectToIncreaseMap[stage] = 0; + } + countersWeExpectToIncreaseMap[stage]++; + } + + // Run the command and update metrics to reflect the post-command serverStatus state. + command(); + metrics = db.serverStatus().metrics.aggStageCounters; + // Verify that serverStatus reflects expected counters. + for (let stage of countersWeExpectToIncrease) { + assert.eq(metrics[stage], countersWeExpectToIncreaseMap[stage]); + } + + // Verify that the counters which we expect not to increase did not do so. + for (let stage of countersWeExpectNotToIncrease) { + assert.eq(metrics[stage], noIncreaseCounterMap[stage]); + } + } + + function runTests(db, coll) { + // Reset our counter map before running any aggregations. + countersWeExpectToIncreaseMap = {}; + + // Setup for agg stages which have nested pipelines. + assert.commandWorked(coll.insert([ + {"_id": 1, "item": "almonds", "price": 12, "quantity": 2}, + {"_id": 2, "item": "pecans", "price": 20, "quantity": 1}, + {"_id": 3} + ])); + + assert.commandWorked(db.inventory.insert([ + {"_id": 1, "sku": "almonds", description: "product 1", "instock": 120}, + {"_id": 2, "sku": "bread", description: "product 2", "instock": 80}, + {"_id": 3, "sku": "cashews", description: "product 3", "instock": 60}, + {"_id": 4, "sku": "pecans", description: "product 4", "instock": 70}, + {"_id": 5, "sku": null, description: "Incomplete"}, + {"_id": 6} + ])); + + // $skip + checkCounters(() => coll.aggregate([{$skip: 5}]).toArray(), ['$skip']); + // $project is an alias for $unset. + checkCounters(() => coll.aggregate([{$project: {title: 1, author: 1}}]).toArray(), + ['$project'], + ['$unset']); + // $count is an alias for $project and $group. + checkCounters( + () => coll.aggregate([{$count: "test"}]).toArray(), ['$count'], ['$project', '$group']); + + // $lookup + checkCounters(() => coll.aggregate([{ + $lookup: { + from: "inventory", + pipeline: [{$match: {inStock: 70}}], + as: "inventory_docs" + } + }]) + .toArray(), + ['$lookup', "$match"]); + + // $facet + checkCounters(() => coll.aggregate([{ + $facet: { + "a": [{$match: {price: {$exists: 1}}}], + "b": [{$project: {title: 1}}] + } + }]) + .toArray(), + ['$facet', '$match', "$project"]); + + // Verify that explain ticks counters. + checkCounters(() => coll.explain().aggregate([{$match: {a: 5}}]), ["$match"]); + + // Verify that a stage which appears multiple times in a pipeline has an accurate count. + checkCounters(() => coll.aggregate([ + { + $facet: { + "a": [{$match: {price: {$exists: 1}}}], + "b": [{$project: {title: 1}}] + } + }, + { + $facet: { + "c": [{$match: {instock: {$exists: 1}}}], + "d": [{$project: {title: 0}}] + } + } + ]) + .toArray(), + ["$facet", "$match", "$project", "$facet", "$match", "$project"]); + + // Verify that a pipeline used in a view ticks counters. + const viewName = "counterView"; + assert.commandWorked(db.createView(viewName, coll.getName(), [{"$project": {_id: 0}}])); + // Note that $project's counter will also be ticked since the $project used to generate the + // view and will be stitched together with the pipeline specified to the aggregate command. + checkCounters(() => db[viewName].aggregate([{$match: {a: 5}}]).toArray(), + ["$match", "$project"]); + } + + // Standalone + const conn = MongoRunner.runMongod(); + assert.neq(null, conn, "mongod was unable to start up"); + let db = conn.getDB(jsTest.name()); + const collName = jsTest.name(); + let coll = db[collName]; + runTests(db, coll); + + MongoRunner.stopMongod(conn); + + // Sharded cluster + const st = new ShardingTest({shards: 2}); + db = st.s.getDB(jsTest.name()); + coll = db[collName]; + st.shardColl(coll.getFullName(), {_id: 1}, {_id: "hashed"}); + + runTests(db, coll); + + st.stop(); +}()); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index f9e07a9b379..9daa28c57b6 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -336,9 +336,9 @@ Status runAggregate(OperationContext* opCtx, unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; boost::intrusive_ptr<ExpressionContext> expCtx; Pipeline* unownedPipeline; + const LiteParsedPipeline liteParsedPipeline(request); auto curOp = CurOp::get(opCtx); { - const LiteParsedPipeline liteParsedPipeline(request); try { // Check whether the parsed pipeline supports the given read concern. @@ -558,6 +558,9 @@ Status runAggregate(OperationContext* opCtx, ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin); + // Report usage statistics for each stage in the pipeline. + liteParsedPipeline.tickGlobalStageCounters(); + // If both explain and cursor are specified, explain wins. if (expCtx->explain) { Explain::explainPipelineExecutor( diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index fb911760111..bf753f4be96 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -256,6 +256,8 @@ env.Library( 'lite_parsed_document_source.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/commands/server_status_core', + '$BUILD_DIR/mongo/db/stats/counters', 'aggregation_request', ] ) diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 92d2ed6ffdf..bd3aa246baf 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -49,10 +49,12 @@ public: public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(request.getNamespaceString(), spec); + return stdx::make_unique<LiteParsed>( + spec.fieldName(), request.getNamespaceString(), spec); } - explicit LiteParsed(NamespaceString nss, BSONElement spec) : _nss(std::move(nss)) { + explicit LiteParsed(std::string parseTimeName, NamespaceString nss, BSONElement spec) + : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) { // We don't do any validation here, just a minimal check for the resume token. We also // do not need to extract the token unless the stream is running on a single namespace. if (_nss.isCollectionlessAggregateNS() || spec.type() != BSONType::Object) { diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index b71898dbca9..2e41e825df6 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -44,10 +44,11 @@ public: public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(request.getNamespaceString()); + return std::make_unique<LiteParsed>(spec.fieldName(), request.getNamespaceString()); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(std::string parseTimeName, NamespaceString nss) + : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {} bool isCollStats() const final { return true; diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index adede47959b..18030e7fbbc 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -99,7 +99,8 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li } } - return stdx::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers, localOps); + return std::make_unique<DocumentSourceCurrentOp::LiteParsed>( + spec.fieldName(), allUsers, localOps); } diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index f189325e93c..de2b0dc5918 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -49,8 +49,10 @@ public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec); - LiteParsed(UserMode allUsers, LocalOpsMode localOps) - : _allUsers(allUsers), _localOps(localOps) {} + LiteParsed(std::string parseTimeName, UserMode allUsers, LocalOpsMode localOps) + : LiteParsedDocumentSource(std::move(parseTimeName)), + _allUsers(allUsers), + _localOps(localOps) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 577b156ef6d..9e2f13bf07d 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -129,8 +129,8 @@ std::unique_ptr<DocumentSourceFacet::LiteParsed> DocumentSourceFacet::LiteParsed pipeline.requiredPrivileges(unusedIsMongosFlag)); } - return stdx::make_unique<DocumentSourceFacet::LiteParsed>(std::move(liteParsedPipelines), - std::move(requiredPrivileges)); + return stdx::make_unique<DocumentSourceFacet::LiteParsed>( + spec.fieldName(), std::move(liteParsedPipelines), std::move(requiredPrivileges)); } stdx::unordered_set<NamespaceString> DocumentSourceFacet::LiteParsed::getInvolvedNamespaces() diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 3f98fe1bdfa..1d1ab36b163 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -72,8 +72,11 @@ public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec); - LiteParsed(std::vector<LiteParsedPipeline> liteParsedPipelines, PrivilegeVector privileges) - : _liteParsedPipelines(std::move(liteParsedPipelines)), + LiteParsed(std::string parseTimeName, + std::vector<LiteParsedPipeline> liteParsedPipelines, + PrivilegeVector privileges) + : LiteParsedDocumentSource(std::move(parseTimeName)), + _liteParsedPipelines(std::move(liteParsedPipelines)), _requiredPrivileges(std::move(privileges)) {} PrivilegeVector requiredPrivileges(bool isMongos) const final { @@ -82,6 +85,10 @@ public: stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final; + const std::vector<LiteParsedPipeline>& getSubPipelines() const final { + return _liteParsedPipelines; + } + private: const std::vector<LiteParsedPipeline> _liteParsedPipelines; const PrivilegeVector _requiredPrivileges; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index f22eebb0f04..6e9c4776df4 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -76,8 +76,8 @@ std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceGraphL PrivilegeVector privileges{ Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}; - return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(std::move(nss), - std::move(privileges)); + return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>( + spec.fieldName(), std::move(nss), std::move(privileges)); } REGISTER_DOCUMENT_SOURCE(graphLookup, diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 51d92262b0f..5700f61d238 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -45,10 +45,11 @@ public: public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(request.getNamespaceString()); + return std::make_unique<LiteParsed>(spec.fieldName(), request.getNamespaceString()); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(std::string parseTimeName, NamespaceString nss) + : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.h b/src/mongo/db/pipeline/document_source_list_local_cursors.h index 62e8c237afe..103990f27a0 100644 --- a/src/mongo/db/pipeline/document_source_list_local_cursors.h +++ b/src/mongo/db/pipeline/document_source_list_local_cursors.h @@ -52,9 +52,12 @@ public: class LiteParsed final : public LiteParsedDocumentSource { public: + explicit LiteParsed(std::string parseTimeName) + : LiteParsedDocumentSource(std::move(parseTimeName)) {} + static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(); + return stdx::make_unique<LiteParsed>(spec.fieldName()); } stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 787b537b515..cdabd1ae443 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -59,11 +59,12 @@ public: public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - - return stdx::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec)); + return stdx::make_unique<LiteParsed>(spec.fieldName(), + listSessionsParseSpec(kStageName, spec)); } - explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {} + explicit LiteParsed(std::string parseTimeName, const ListSessionsSpec& spec) + : LiteParsedDocumentSource(std::move(parseTimeName)), _spec(spec) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h index aa7400bad0c..185ede0daa2 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_sessions.h @@ -54,10 +54,12 @@ public: public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec)); + return stdx::make_unique<LiteParsed>(spec.fieldName(), + listSessionsParseSpec(kStageName, spec)); } - explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {} + explicit LiteParsed(std::string parseTimeName, const ListSessionsSpec& spec) + : LiteParsedDocumentSource(std::move(parseTimeName)), _spec(spec) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 2646d125d32..ba175032f79 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -167,20 +167,20 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars // Recursively lite parse the nested pipeline, if one exists. auto pipelineElem = specObj["pipeline"]; - boost::optional<LiteParsedPipeline> liteParsedPipeline; + std::vector<LiteParsedPipeline> liteParsedPipelineVector; if (pipelineElem) { auto pipeline = uassertStatusOK(AggregationRequest::parsePipelineFromBSON(pipelineElem)); AggregationRequest foreignAggReq(fromNss, std::move(pipeline)); - liteParsedPipeline = LiteParsedPipeline(foreignAggReq); - - auto pipelineInvolvedNamespaces = liteParsedPipeline->getInvolvedNamespaces(); + LiteParsedPipeline liteParsedPipeline(foreignAggReq); + auto pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces(); foreignNssSet.insert(pipelineInvolvedNamespaces.begin(), pipelineInvolvedNamespaces.end()); + liteParsedPipelineVector.push_back(std::move(liteParsedPipeline)); } - foreignNssSet.insert(fromNss); - - return stdx::make_unique<DocumentSourceLookUp::LiteParsed>( - std::move(fromNss), std::move(foreignNssSet), std::move(liteParsedPipeline)); + return stdx::make_unique<DocumentSourceLookUp::LiteParsed>(spec.fieldName(), + std::move(fromNss), + std::move(foreignNssSet), + std::move(liteParsedPipelineVector)); } REGISTER_DOCUMENT_SOURCE(lookup, diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index aa8ecf50768..d778162a194 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -56,10 +56,12 @@ public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec); - LiteParsed(NamespaceString fromNss, + LiteParsed(std::string parseTimeName, + NamespaceString fromNss, stdx::unordered_set<NamespaceString> foreignNssSet, - boost::optional<LiteParsedPipeline> liteParsedPipeline) - : _fromNss{std::move(fromNss)}, + std::vector<LiteParsedPipeline> liteParsedPipeline) + : LiteParsedDocumentSource(std::move(parseTimeName)), + _fromNss{std::move(fromNss)}, _foreignNssSet(std::move(foreignNssSet)), _liteParsedPipeline(std::move(liteParsedPipeline)) {} @@ -73,18 +75,25 @@ public: &requiredPrivileges, Privilege(ResourcePattern::forExactNamespace(_fromNss), ActionType::find)); - if (_liteParsedPipeline) { + if (!_liteParsedPipeline.empty()) { + invariant(_liteParsedPipeline.size() == 1); Privilege::addPrivilegesToPrivilegeVector( - &requiredPrivileges, _liteParsedPipeline->requiredPrivileges(isMongos)); + &requiredPrivileges, _liteParsedPipeline[0].requiredPrivileges(isMongos)); } return requiredPrivileges; } + const std::vector<LiteParsedPipeline>& getSubPipelines() const override { + return _liteParsedPipeline; + } + private: const NamespaceString _fromNss; const stdx::unordered_set<NamespaceString> _foreignNssSet; - const boost::optional<LiteParsedPipeline> _liteParsedPipeline; + // Even though this will only ever hold 1 element, it is stored in a vector to satisfy + // 'getSubPipelines'. + const std::vector<LiteParsedPipeline> _liteParsedPipeline; }; GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 247dee7809f..654ee1a136b 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -70,8 +70,8 @@ std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceOut::l PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)}; - return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(std::move(targetNss), - std::move(privileges)); + return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>( + spec.fieldName(), std::move(targetNss), std::move(privileges)); } REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::liteParse, DocumentSourceOut::createFromBson); diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.cpp b/src/mongo/db/pipeline/lite_parsed_document_source.cpp index 882758b9644..22d764d37e3 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.cpp +++ b/src/mongo/db/pipeline/lite_parsed_document_source.cpp @@ -32,18 +32,28 @@ #include "mongo/db/pipeline/lite_parsed_document_source.h" -#include "mongo/util/string_map.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" +#include "mongo/db/stats/counters.h" namespace mongo { using Parser = LiteParsedDocumentSource::Parser; namespace { + +// Empty vector used by LiteParsedDocumentSources which do not have a sub pipeline. +std::vector<LiteParsedPipeline> kNoSubPipeline = {}; + StringMap<Parser> parserMap; } // namespace void LiteParsedDocumentSource::registerParser(const std::string& name, Parser parser) { parserMap[name] = parser; + // Initialize a counter for this document source to track how many times it is used. + invariant( + aggStageCounters.stageCounterMap + .insert(std::make_pair(name, std::make_unique<AggStageCounters::StageCounter>(name))) + .second); } std::unique_ptr<LiteParsedDocumentSource> LiteParsedDocumentSource::parse( @@ -62,4 +72,9 @@ std::unique_ptr<LiteParsedDocumentSource> LiteParsedDocumentSource::parse( return it->second(request, specElem); } + + +const std::vector<LiteParsedPipeline>& LiteParsedDocumentSource::getSubPipelines() const { + return kNoSubPipeline; } +} // namespace mongo diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index c3173fb8212..84b7d4be557 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -44,6 +44,8 @@ namespace mongo { +class LiteParsedPipeline; + /** * A lightly parsed version of a DocumentSource. It is not executable and not guaranteed to return a * parse error when encountering an invalid specification. Instead, the purpose of this class is to @@ -52,6 +54,9 @@ namespace mongo { */ class LiteParsedDocumentSource { public: + LiteParsedDocumentSource(std::string parseTimeName) + : _parseTimeName(std::move(parseTimeName)) {} + virtual ~LiteParsedDocumentSource() = default; /* @@ -142,6 +147,22 @@ public: * UserException if not compatible. */ virtual void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {} + + /** + * Returns this document source's subpipelines. If none exist, a reference to an empty vector + * is returned. + */ + virtual const std::vector<LiteParsedPipeline>& getSubPipelines() const; + + /** + * Returns the name of the stage that this LiteParsedDocumentSource represents. + */ + const std::string& getParseTimeName() const { + return _parseTimeName; + } + +private: + std::string _parseTimeName; }; class LiteParsedDocumentSourceDefault final : public LiteParsedDocumentSource { @@ -153,10 +174,11 @@ public: */ static std::unique_ptr<LiteParsedDocumentSourceDefault> parse(const AggregationRequest& request, const BSONElement& spec) { - return stdx::make_unique<LiteParsedDocumentSourceDefault>(); + return std::make_unique<LiteParsedDocumentSourceDefault>(spec.fieldName()); } - LiteParsedDocumentSourceDefault() = default; + LiteParsedDocumentSourceDefault(std::string parseTimeName) + : LiteParsedDocumentSource(std::move(parseTimeName)) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); @@ -172,13 +194,20 @@ public: */ class LiteParsedDocumentSourceForeignCollections : public LiteParsedDocumentSource { public: - LiteParsedDocumentSourceForeignCollections(NamespaceString foreignNss, + LiteParsedDocumentSourceForeignCollections(std::string parseTimeName, + NamespaceString foreignNss, PrivilegeVector privileges) - : _foreignNssSet{std::move(foreignNss)}, _requiredPrivileges(std::move(privileges)) {} + : LiteParsedDocumentSource(std::move(parseTimeName)), + _foreignNssSet{std::move(foreignNss)}, + _requiredPrivileges(std::move(privileges)) {} - LiteParsedDocumentSourceForeignCollections(stdx::unordered_set<NamespaceString> foreignNssSet, + LiteParsedDocumentSourceForeignCollections(std::string parseTimeName, + stdx::unordered_set<NamespaceString> foreignNssSet, PrivilegeVector privileges) - : _foreignNssSet(std::move(foreignNssSet)), _requiredPrivileges(std::move(privileges)) {} + : LiteParsedDocumentSource(std::move(parseTimeName)), + _foreignNssSet(std::move(foreignNssSet)), + _requiredPrivileges(std::move(privileges)) {} + stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return {_foreignNssSet}; diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index c717c20f275..69e56443a22 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -37,6 +37,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/stats/counters.h" namespace mongo { @@ -152,6 +153,23 @@ public: } } + /** + * Increments global stage counters corresponding to the stages in this lite parsed pipeline. + */ + void tickGlobalStageCounters() const { + for (auto&& stage : _stageSpecs) { + // Tick counter corresponding to current stage. + auto entry = aggStageCounters.stageCounterMap.find(stage->getParseTimeName()); + invariant(entry != aggStageCounters.stageCounterMap.end()); + entry->second->counter.increment(1); + + // Recursively step through any sub-pipelines. + for (auto&& subPipeline : stage->getSubPipelines()) { + subPipeline.tickGlobalStageCounters(); + } + } + } + private: std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs; NamespaceString _nss; diff --git a/src/mongo/db/stats/counters.cpp b/src/mongo/db/stats/counters.cpp index af695566316..6e8bbfdadb2 100644 --- a/src/mongo/db/stats/counters.cpp +++ b/src/mongo/db/stats/counters.cpp @@ -205,4 +205,5 @@ void NetworkCounter::append(BSONObjBuilder& b) { OpCounters globalOpCounters; OpCounters replOpCounters; NetworkCounter networkCounter; -} +AggStageCounters aggStageCounters; +} // namespace mongo diff --git a/src/mongo/db/stats/counters.h b/src/mongo/db/stats/counters.h index e9742797100..e30efaca25a 100644 --- a/src/mongo/db/stats/counters.h +++ b/src/mongo/db/stats/counters.h @@ -31,6 +31,8 @@ #pragma once +#include "mongo/base/counter.h" +#include "mongo/db/commands/server_status_metric.h" #include "mongo/db/jsobj.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/basic.h" @@ -125,4 +127,21 @@ private: }; extern NetworkCounter networkCounter; -} + +class AggStageCounters { +public: + // Container for a stage count metric along with its corresponding counter. + struct StageCounter { + explicit StageCounter(const std::string& name) + : metric("aggStageCounters." + name, &counter) {} + + Counter64 counter; + ServerStatusMetricField<Counter64> metric; + }; + + // Map of aggregation stages to the number of occurrences. + stdx::unordered_map<std::string, std::unique_ptr<StageCounter>> stageCounterMap; +}; + +extern AggStageCounters aggStageCounters; +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 0fab00f2d19..7262819e5d1 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -1129,8 +1129,16 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, nsStruct.requestedNss = namespaces.requestedNss; nsStruct.executionNss = resolvedView->getNamespace(); - return ClusterAggregate::runAggregate( + auto viewStatus = ClusterAggregate::runAggregate( opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, out); + if (viewStatus.isOK()) { + // If view execution succeeded, count the stages that are part of the view definition. + AggregationRequest viewPipeline{resolvedView->getNamespace(), + resolvedView->getPipeline()}; + LiteParsedPipeline viewLiteParsed(viewPipeline); + viewLiteParsed.tickGlobalStageCounters(); + } + return viewStatus; } return status; diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index f3b4fa36e71..b09d02b522f 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -35,6 +35,7 @@ #include "mongo/base/status.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/s/commands/cluster_aggregate.h" namespace mongo { @@ -106,8 +107,13 @@ private: const auto& nss = aggregationRequest.getNamespaceString(); - return ClusterAggregate::runAggregate( + auto status = ClusterAggregate::runAggregate( opCtx, ClusterAggregate::Namespaces{nss, nss}, aggregationRequest, cmdObj, result); + if (status.isOK()) { + LiteParsedPipeline lpp(aggregationRequest); + lpp.tickGlobalStageCounters(); + } + return status; } } clusterPipelineCmd; |