diff options
32 files changed, 350 insertions, 59 deletions
diff --git a/jstests/noPassthrough/server_status_aggregation_stage_counter.js b/jstests/noPassthrough/server_status_aggregation_stage_counter.js new file mode 100644 index 00000000000..ead660a7cfd --- /dev/null +++ b/jstests/noPassthrough/server_status_aggregation_stage_counter.js @@ -0,0 +1,153 @@ +/** + * Tests for serverStatus metrics.stage stats. + * @tags: [requires_sharding] + */ +(function() { +"use strict"; + +// In memory map of stage names to their counters. Used to verify that serverStatus is +// incrementing the appropriate stages correctly across multiple pipelines. +let countersWeExpectToIncreaseMap = {}; + +function checkCounters(command, countersWeExpectToIncrease, countersWeExpectNotToIncrease = []) { + // Capture the pre-aggregation counts of the stages which we expect not to increase. + let metrics = db.serverStatus().metrics.aggStageCounters; + let noIncreaseCounterMap = {}; + for (const stage of countersWeExpectNotToIncrease) { + if (!countersWeExpectToIncreaseMap[stage]) { + countersWeExpectToIncreaseMap[stage] = 0; + } + noIncreaseCounterMap[stage] = metrics[stage]; + } + + // Update in memory map to reflect what each counter's count should be after running 'command'. + for (const stage of countersWeExpectToIncrease) { + if (!countersWeExpectToIncreaseMap[stage]) { + countersWeExpectToIncreaseMap[stage] = 0; + } + countersWeExpectToIncreaseMap[stage]++; + } + + // Run the command and update metrics to reflect the post-command serverStatus state. + command(); + metrics = db.serverStatus().metrics.aggStageCounters; + + // Verify that serverStatus reflects expected counters. + for (const stage of countersWeExpectToIncrease) { + assert.eq(metrics[stage], countersWeExpectToIncreaseMap[stage]); + } + + // Verify that the counters which we expect not to increase did not do so. + for (const stage of countersWeExpectNotToIncrease) { + assert.eq(metrics[stage], noIncreaseCounterMap[stage]); + } +} + +function runTests(db, coll) { + // Reset our counter map before running any aggregations. + countersWeExpectToIncreaseMap = {}; + + // Setup for agg stages which have nested pipelines. + assert.commandWorked(db[coll].insert([ + {"_id": 1, "item": "almonds", "price": 12, "quantity": 2}, + {"_id": 2, "item": "pecans", "price": 20, "quantity": 1}, + {"_id": 3} + ])); + + assert.commandWorked(db.inventory.insert([ + {"_id": 1, "sku": "almonds", description: "product 1", "instock": 120}, + {"_id": 2, "sku": "bread", description: "product 2", "instock": 80}, + {"_id": 3, "sku": "cashews", description: "product 3", "instock": 60}, + {"_id": 4, "sku": "pecans", description: "product 4", "instock": 70}, + {"_id": 5, "sku": null, description: "Incomplete"}, + {"_id": 6} + ])); + + // $skip + checkCounters(() => coll.aggregate([{$skip: 5}]).toArray(), ['$skip']); + // $project is an alias for $unset. + checkCounters(() => coll.aggregate([{$project: {title: 1, author: 1}}]).toArray(), + ['$project'], + ['$unset']); + // $count is an alias for $project and $group. + checkCounters( + () => coll.aggregate([{$count: "test"}]).toArray(), ['$count'], ['$project', '$group']); + + // $lookup + checkCounters( + () => coll.aggregate([{$lookup: {from: "inventory", pipeline: [{$match: {inStock: 70}}], as: "inventory_docs"}}]).toArray(), + ['$lookup', "$match"]); + + // $merge + checkCounters( + () => coll.aggregate([{ + $merge: + {into: coll.getName(), whenMatched: [{$set: {a: {$multiply: ["$a", 2]}}}]} + }]) + .toArray(), + ['$merge', "$set"]); + + // $facet + checkCounters( + () => + coll.aggregate([{ + $facet: {"a": [{$match: {price: {$exists: 1}}}], "b": [{$project: {title: 1}}]} + }]) + .toArray(), + ['$facet', '$match', "$project"]); + + // Verify that explain ticks counters. + checkCounters(() => coll.explain().aggregate([{$match: {a: 5}}]), ["$match"]); + + // Verify that a pipeline in an update ticks counters. + checkCounters(() => coll.update({_id: 5}, [{$addFields: {a: {$add: ['$a', 1]}}}]), + ["$addFields"], + ["$set"]); + + // Verify that a stage which appears multiple times in a pipeline has an accurate count. + checkCounters( + () => + coll.aggregate([ + { + $facet: + {"a": [{$match: {price: {$exists: 1}}}], "b": [{$project: {title: 1}}]} + }, + { + $facet: { + "c": [{$match: {instock: {$exists: 1}}}], + "d": [{$project: {title: 0}}] + } + } + ]) + .toArray(), + ["$facet", "$match", "$project", "$facet", "$match", "$project"]); + + // Verify that a pipeline used in a view ticks counters. + const viewName = "counterView"; + assert.commandWorked(db.createView(viewName, coll.getName(), [{"$project": {_id: 0}}])); + // Note that $project's counter will also be ticked since the $project used to generate the view + // will be stitched together with the pipeline specified to the aggregate command. + checkCounters(() => db[viewName].aggregate([{$match: {a: 5}}]).toArray(), + ["$match", "$project"]); +} + +// Standalone +const conn = MongoRunner.runMongod(); +assert.neq(null, conn, "mongod was unable to start up"); +let db = conn.getDB(jsTest.name()); +const collName = jsTest.name(); +let coll = db[collName]; +runTests(db, coll); + +MongoRunner.stopMongod(conn); + +// Sharded cluster +const st = new ShardingTest({shards: 2}); +db = st.s.getDB(jsTest.name()); +coll = db[collName]; +st.shardColl(coll.getFullName(), {_id: 1}, {_id: "hashed"}); + +runTests(db, coll); + +st.stop(); +}()); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index ac57422ea33..369106d769b 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -52,7 +52,6 @@ #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" -#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/pipeline/process_interface/mongo_process_interface.h" @@ -767,6 +766,9 @@ Status runAggregate(OperationContext* opCtx, pins.emplace_back(std::move(pin)); } + // Report usage statistics for each stage in the pipeline. + liteParsedPipeline.tickGlobalStageCounters(); + // If both explain and cursor are specified, explain wins. if (expCtx->explain) { auto explainExecutor = pins[0].getCursor()->getExecutor(); diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 269da4407d1..fd5a9c52c55 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -59,6 +59,7 @@ #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/query/collection_query_info.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" @@ -841,6 +842,14 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who if (!canContinue) break; } + + // If this was a pipeline style update, record which stages were being used. + auto updateMod = singleOp.getU(); + if (updateMod.type() == write_ops::UpdateModification::Type::kPipeline) { + auto pipeline = + LiteParsedPipeline(wholeOp.getNamespace(), updateMod.getUpdatePipeline()); + pipeline.tickGlobalStageCounters(); + } } return out; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 5ad3adefc5b..ecd9b51397c 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -184,6 +184,7 @@ env.Library( 'lite_parsed_pipeline.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/stats/counters', 'aggregation_request', ] ) diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index b7bf369af30..eb39e8f90ea 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -51,10 +51,11 @@ public: public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec) { - return std::make_unique<LiteParsed>(nss); + return std::make_unique<LiteParsed>(spec.fieldName(), nss); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(std::string parseTimeName, NamespaceString nss) + : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {} bool isChangeStream() const final { return true; diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 60401817729..6e97cf562c6 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -45,10 +45,11 @@ public: public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec) { - return std::make_unique<LiteParsed>(nss); + return std::make_unique<LiteParsed>(spec.fieldName(), nss); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(std::string parseTimeName, NamespaceString nss) + : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {} bool isCollStats() const final { return true; diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index 85ca3cf8835..e0d40c619c0 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -100,7 +100,8 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li } } - return std::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers, localOps); + return std::make_unique<DocumentSourceCurrentOp::LiteParsed>( + spec.fieldName(), allUsers, localOps); } const char* DocumentSourceCurrentOp::getSourceName() const { diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index 37abba0e53a..ed2093c449b 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -50,8 +50,10 @@ public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec); - LiteParsed(UserMode allUsers, LocalOpsMode localOps) - : _allUsers(allUsers), _localOps(localOps) {} + LiteParsed(std::string parseTimeName, UserMode allUsers, LocalOpsMode localOps) + : LiteParsedDocumentSource(std::move(parseTimeName)), + _allUsers(allUsers), + _localOps(localOps) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index fcbfc566c0c..9414a87ad5c 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -114,7 +114,8 @@ std::unique_ptr<DocumentSourceFacet::LiteParsed> DocumentSourceFacet::LiteParsed liteParsedPipelines.emplace_back(LiteParsedPipeline(nss, rawPipeline.second)); } - return std::make_unique<DocumentSourceFacet::LiteParsed>(std::move(liteParsedPipelines)); + return std::make_unique<DocumentSourceFacet::LiteParsed>(spec.fieldName(), + std::move(liteParsedPipelines)); } REGISTER_DOCUMENT_SOURCE(facet, diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index b83d34597eb..a78a89dab81 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -72,8 +72,9 @@ public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec); - LiteParsed(std::vector<LiteParsedPipeline> pipelines) - : LiteParsedDocumentSourceNestedPipelines(boost::none, std::move(pipelines)) {} + LiteParsed(std::string parseTimeName, std::vector<LiteParsedPipeline> pipelines) + : LiteParsedDocumentSourceNestedPipelines( + std::move(parseTimeName), boost::none, std::move(pipelines)) {} PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const override final { diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 7b76bfc9d26..b714ed53c9a 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -85,7 +85,7 @@ std::unique_ptr<DocumentSourceGraphLookUp::LiteParsed> DocumentSourceGraphLookUp uassert(ErrorCodes::InvalidNamespace, str::stream() << "invalid $graphLookup namespace: " << fromNss.ns(), fromNss.isValid()); - return std::make_unique<LiteParsed>(std::move(fromNss)); + return std::make_unique<LiteParsed>(spec.fieldName(), std::move(fromNss)); } REGISTER_DOCUMENT_SOURCE(graphLookup, diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 979c1dde866..f9124816b08 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -43,8 +43,9 @@ public: class LiteParsed : public LiteParsedDocumentSourceForeignCollection { public: - LiteParsed(NamespaceString foreignNss) - : LiteParsedDocumentSourceForeignCollection(std::move(foreignNss)) {} + LiteParsed(std::string parseTimeName, NamespaceString foreignNss) + : LiteParsedDocumentSourceForeignCollection(std::move(parseTimeName), + std::move(foreignNss)) {} static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec); diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index f43bff49df6..f70a3d48388 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -46,10 +46,11 @@ public: public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec) { - return std::make_unique<LiteParsed>(nss); + return std::make_unique<LiteParsed>(spec.fieldName(), nss); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(std::string parseTimeName, NamespaceString nss) + : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h index 40af292031d..35ab6dfe54c 100644 --- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h +++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h @@ -49,9 +49,12 @@ public: public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec) { - return std::make_unique<LiteParsed>(); + return std::make_unique<LiteParsed>(spec.fieldName()); } + explicit LiteParsed(std::string parseTimeName) + : LiteParsedDocumentSource(std::move(parseTimeName)) {} + stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); } diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index dbe545c3096..4615296a0ed 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -60,10 +60,12 @@ public: const BSONElement& spec) { return std::make_unique<LiteParsed>( + spec.fieldName(), listSessionsParseSpec(DocumentSourceListLocalSessions::kStageName, spec)); } - explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {} + explicit LiteParsed(std::string parseTimeName, const ListSessionsSpec& spec) + : LiteParsedDocumentSource(std::move(parseTimeName)), _spec(spec) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h index ed8dcdf177d..b8287a843a8 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_sessions.h @@ -64,10 +64,12 @@ public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec) { return std::make_unique<LiteParsed>( + spec.fieldName(), listSessionsParseSpec(DocumentSourceListSessions::kStageName, spec)); } - explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {} + explicit LiteParsed(std::string parseTimeName, const ListSessionsSpec& spec) + : LiteParsedDocumentSource(std::move(parseTimeName)), _spec(spec) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 4e30b9fc5dc..ddbe6ac0e59 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -138,8 +138,8 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars liteParsedPipeline = LiteParsedPipeline(fromNss, pipeline); } - return std::make_unique<DocumentSourceLookUp::LiteParsed>(std::move(fromNss), - std::move(liteParsedPipeline)); + return std::make_unique<DocumentSourceLookUp::LiteParsed>( + spec.fieldName(), std::move(fromNss), std::move(liteParsedPipeline)); } PrivilegeVector DocumentSourceLookUp::LiteParsed::requiredPrivileges( diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 7138db9748f..4917ea49191 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -64,8 +64,11 @@ public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec); - LiteParsed(NamespaceString foreignNss, boost::optional<LiteParsedPipeline> pipeline) - : LiteParsedDocumentSourceNestedPipelines(std::move(foreignNss), std::move(pipeline)) {} + LiteParsed(std::string parseTimeName, + NamespaceString foreignNss, + boost::optional<LiteParsedPipeline> pipeline) + : LiteParsedDocumentSourceNestedPipelines( + std::move(parseTimeName), std::move(foreignNss), std::move(pipeline)) {} /** * Lookup from a sharded collection may not be allowed. diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index e953e813bc0..fbcb54950b8 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -329,19 +329,28 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed MergeWhenMatchedMode_serializer(whenMatched), MergeWhenNotMatchedMode_serializer(whenNotMatched)), isSupportedMergeMode(whenMatched, whenNotMatched)); - - return std::make_unique<DocumentSourceMerge::LiteParsed>( - std::move(targetNss), whenMatched, whenNotMatched); + boost::optional<LiteParsedPipeline> liteParsedPipeline; + if (whenMatched == MergeWhenMatchedModeEnum::kPipeline) { + auto pipeline = mergeSpec.getWhenMatched()->pipeline; + invariant(pipeline); + liteParsedPipeline = LiteParsedPipeline(nss, *pipeline); + } + return std::make_unique<DocumentSourceMerge::LiteParsed>(spec.fieldName(), + std::move(targetNss), + whenMatched, + whenNotMatched, + std::move(liteParsedPipeline)); } PrivilegeVector DocumentSourceMerge::LiteParsed::requiredPrivileges( bool isMongos, bool bypassDocumentValidation) const { + invariant(_foreignNss); auto actions = ActionSet{getDescriptors().at({_whenMatched, _whenNotMatched}).actions}; if (bypassDocumentValidation) { actions.addAction(ActionType::bypassDocumentValidation); } - return {{ResourcePattern::forExactNamespace(_foreignNss), actions}}; + return {{ResourcePattern::forExactNamespace(*_foreignNss), actions}}; } DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs, diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index 808836e0d77..464b4cfdaa0 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/document_source_merge_gen.h" #include "mongo/db/pipeline/document_source_writer.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" namespace mongo { @@ -68,21 +69,22 @@ public: * collection is unsharded. This ensures that the unique index verification happens once on * mongos and can be bypassed on the shards. */ - class LiteParsed final : public LiteParsedDocumentSourceForeignCollection { + class LiteParsed final : public LiteParsedDocumentSourceNestedPipelines { public: - LiteParsed(NamespaceString foreignNss, + LiteParsed(std::string parseTimeName, + NamespaceString foreignNss, MergeWhenMatchedModeEnum whenMatched, - MergeWhenNotMatchedModeEnum whenNotMatched) - : LiteParsedDocumentSourceForeignCollection(std::move(foreignNss)), + MergeWhenNotMatchedModeEnum whenNotMatched, + boost::optional<LiteParsedPipeline> onMatchedPipeline) + : LiteParsedDocumentSourceNestedPipelines( + std::move(parseTimeName), std::move(foreignNss), std::move(onMatchedPipeline)), _whenMatched(whenMatched), _whenNotMatched(whenNotMatched) {} - using LiteParsedDocumentSourceForeignCollection::LiteParsedDocumentSourceForeignCollection; - static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec); - bool allowedToPassthroughFromMongos() const final { + bool allowedToPassthroughFromMongos() const { return false; } diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 93122dac504..3cde388224b 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -91,7 +91,7 @@ std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::pa "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()), targetNss.isValid()); - return std::make_unique<DocumentSourceOut::LiteParsed>(std::move(targetNss)); + return std::make_unique<DocumentSourceOut::LiteParsed>(spec.fieldName(), std::move(targetNss)); } std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parse( @@ -106,7 +106,7 @@ std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::pa "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()), targetNss.isValid()); - return std::make_unique<DocumentSourceOut::LiteParsed>(std::move(targetNss)); + return std::make_unique<DocumentSourceOut::LiteParsed>(spec.fieldName(), std::move(targetNss)); } void DocumentSourceOut::initialize() { diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h index d706f4f3ba4..06aa28d908f 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h @@ -42,10 +42,11 @@ public: public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec) { - return std::make_unique<LiteParsed>(nss); + return std::make_unique<LiteParsed>(spec.fieldName(), nss); } - explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {} + explicit LiteParsed(std::string parseTimeName, NamespaceString nss) + : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const override { // There are no foreign collections. diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index 838923e3392..151ea99c58a 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -92,8 +92,8 @@ std::unique_ptr<DocumentSourceUnionWith::LiteParsed> DocumentSourceUnionWith::Li } } - return std::make_unique<DocumentSourceUnionWith::LiteParsed>(std::move(unionNss), - std::move(liteParsedPipeline)); + return std::make_unique<DocumentSourceUnionWith::LiteParsed>( + spec.fieldName(), std::move(unionNss), std::move(liteParsedPipeline)); } PrivilegeVector DocumentSourceUnionWith::LiteParsed::requiredPrivileges( diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h index 69f1fbaf073..3225a1e6468 100644 --- a/src/mongo/db/pipeline/document_source_union_with.h +++ b/src/mongo/db/pipeline/document_source_union_with.h @@ -50,8 +50,11 @@ public: static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec); - LiteParsed(NamespaceString foreignNss, boost::optional<LiteParsedPipeline> pipeline) - : LiteParsedDocumentSourceNestedPipelines(std::move(foreignNss), std::move(pipeline)) {} + LiteParsed(std::string parseTimeName, + NamespaceString foreignNss, + boost::optional<LiteParsedPipeline> pipeline) + : LiteParsedDocumentSourceNestedPipelines( + std::move(parseTimeName), std::move(foreignNss), std::move(pipeline)) {} PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const override final; diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.cpp b/src/mongo/db/pipeline/lite_parsed_document_source.cpp index 32c76a8d37e..a21064b8387 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.cpp +++ b/src/mongo/db/pipeline/lite_parsed_document_source.cpp @@ -30,20 +30,26 @@ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" - #include "mongo/db/pipeline/lite_parsed_pipeline.h" -#include "mongo/util/string_map.h" +#include "mongo/db/stats/counters.h" namespace mongo { using Parser = LiteParsedDocumentSource::Parser; namespace { + +// Empty vector used by LiteParsedDocumentSources which do not have a sub pipeline. +inline static std::vector<LiteParsedPipeline> kNoSubPipeline = {}; + StringMap<Parser> parserMap; + } // namespace void LiteParsedDocumentSource::registerParser(const std::string& name, Parser parser) { parserMap[name] = parser; + // Initialize a counter for this document source to track how many times it is used. + aggStageCounters.stageCounterMap[name] = std::make_unique<AggStageCounters::StageCounter>(name); } std::unique_ptr<LiteParsedDocumentSource> LiteParsedDocumentSource::parse( @@ -63,14 +69,24 @@ std::unique_ptr<LiteParsedDocumentSource> LiteParsedDocumentSource::parse( return it->second(nss, specElem); } +const std::vector<LiteParsedPipeline>& LiteParsedDocumentSource::getSubPipelines() const { + return kNoSubPipeline; +} + LiteParsedDocumentSourceNestedPipelines::LiteParsedDocumentSourceNestedPipelines( - boost::optional<NamespaceString> foreignNss, std::vector<LiteParsedPipeline> pipelines) - : _foreignNss(std::move(foreignNss)), _pipelines(std::move(pipelines)) {} + std::string parseTimeName, + boost::optional<NamespaceString> foreignNss, + std::vector<LiteParsedPipeline> pipelines) + : LiteParsedDocumentSource(std::move(parseTimeName)), + _foreignNss(std::move(foreignNss)), + _pipelines(std::move(pipelines)) {} LiteParsedDocumentSourceNestedPipelines::LiteParsedDocumentSourceNestedPipelines( - boost::optional<NamespaceString> foreignNss, boost::optional<LiteParsedPipeline> pipeline) - : LiteParsedDocumentSourceNestedPipelines(std::move(foreignNss), - std::vector<LiteParsedPipeline>{}) { + std::string parseTimeName, + boost::optional<NamespaceString> foreignNss, + boost::optional<LiteParsedPipeline> pipeline) + : LiteParsedDocumentSourceNestedPipelines( + std::move(parseTimeName), std::move(foreignNss), std::vector<LiteParsedPipeline>{}) { if (pipeline) _pipelines.emplace_back(std::move(pipeline.get())); } diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index bdeb7f7f554..95c28f49510 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -35,6 +35,7 @@ #include <vector> #include "mongo/db/auth/privilege.h" +#include "mongo/db/commands/server_status_metric.h" #include "mongo/db/namespace_string.h" #include "mongo/db/read_concern_support_result.h" #include "mongo/db/repl/read_concern_args.h" @@ -52,6 +53,9 @@ class LiteParsedPipeline; */ class LiteParsedDocumentSource { public: + LiteParsedDocumentSource(std::string parseTimeName) + : _parseTimeName(std::move(parseTimeName)) {} + virtual ~LiteParsedDocumentSource() = default; /* @@ -64,7 +68,6 @@ public: */ using Parser = std::function<std::unique_ptr<LiteParsedDocumentSource>(const NamespaceString&, const BSONElement&)>; - /** * Registers a DocumentSource with a spec parsing function, so that when a stage with the given * name is encountered, it will call 'parser' to construct that stage's specification object. @@ -146,6 +149,19 @@ public: */ virtual void assertSupportsMultiDocumentTransaction() const {} + /** + * Returns this document source's subpipelines. If none exist, a reference to an empty vector + * is returned. + */ + virtual const std::vector<LiteParsedPipeline>& getSubPipelines() const; + + /** + * Returns the name of the stage that this LiteParsedDocumentSource represents. + */ + const std::string& getParseTimeName() const { + return _parseTimeName; + } + protected: void transactionNotSupported(StringData stageName) const { uasserted(ErrorCodes::OperationNotSupportedInTransaction, @@ -175,6 +191,9 @@ protected: return onlySingleReadConcernSupported( stageName, repl::ReadConcernLevel::kLocalReadConcern, level); } + +private: + std::string _parseTimeName; }; class LiteParsedDocumentSourceDefault final : public LiteParsedDocumentSource { @@ -186,10 +205,11 @@ public: */ static std::unique_ptr<LiteParsedDocumentSourceDefault> parse(const NamespaceString& nss, const BSONElement& spec) { - return std::make_unique<LiteParsedDocumentSourceDefault>(); + return std::make_unique<LiteParsedDocumentSourceDefault>(spec.fieldName()); } - LiteParsedDocumentSourceDefault() = default; + LiteParsedDocumentSourceDefault(std::string parseTimeName) + : LiteParsedDocumentSource(std::move(parseTimeName)) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return stdx::unordered_set<NamespaceString>(); @@ -205,8 +225,8 @@ public: */ class LiteParsedDocumentSourceForeignCollection : public LiteParsedDocumentSource { public: - LiteParsedDocumentSourceForeignCollection(NamespaceString foreignNss) - : _foreignNss(std::move(foreignNss)) {} + LiteParsedDocumentSourceForeignCollection(std::string parseTimeName, NamespaceString foreignNss) + : LiteParsedDocumentSource(std::move(parseTimeName)), _foreignNss(std::move(foreignNss)) {} stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { return {_foreignNss}; @@ -224,18 +244,24 @@ protected: */ class LiteParsedDocumentSourceNestedPipelines : public LiteParsedDocumentSource { public: - LiteParsedDocumentSourceNestedPipelines(boost::optional<NamespaceString> foreignNss, + LiteParsedDocumentSourceNestedPipelines(std::string parseTimeName, + boost::optional<NamespaceString> foreignNss, std::vector<LiteParsedPipeline> pipelines); - LiteParsedDocumentSourceNestedPipelines(boost::optional<NamespaceString> foreignNss, + LiteParsedDocumentSourceNestedPipelines(std::string parseTimeName, + boost::optional<NamespaceString> foreignNss, boost::optional<LiteParsedPipeline> pipeline); stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final override; - bool allowedToPassthroughFromMongos() const final override; + bool allowedToPassthroughFromMongos() const override; bool allowShardedForeignCollection(NamespaceString nss) const override; + const std::vector<LiteParsedPipeline>& getSubPipelines() const override { + return _pipelines; + } + protected: boost::optional<NamespaceString> _foreignNss; std::vector<LiteParsedPipeline> _pipelines; diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp index 954239fa928..153912ef448 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/operation_context.h" +#include "mongo/db/stats/counters.h" namespace mongo { @@ -110,4 +111,17 @@ void LiteParsedPipeline::verifyIsSupported( } } +void LiteParsedPipeline::tickGlobalStageCounters() const { + for (auto&& stage : _stageSpecs) { + // Tick counter corresponding to current stage. + aggStageCounters.stageCounterMap.find(stage->getParseTimeName()) + ->second->counter.increment(1); + + // Recursively step through any sub-pipelines. + for (auto&& subPipeline : stage->getSubPipelines()) { + subPipeline.tickGlobalStageCounters(); + } + } +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index 7d6ef9d7355..d80a470c9b0 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -157,6 +157,11 @@ public: return !_stageSpecs.empty() && _stageSpecs.front()->isInitialSource(); } + /** + * Increments global stage counters corresponding to the stages in this lite parsed pipeline. + */ + void tickGlobalStageCounters() const; + private: std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs; }; diff --git a/src/mongo/db/stats/counters.cpp b/src/mongo/db/stats/counters.cpp index 0e5d0595dcd..f4ee4c71676 100644 --- a/src/mongo/db/stats/counters.cpp +++ b/src/mongo/db/stats/counters.cpp @@ -231,4 +231,5 @@ OpCounters globalOpCounters; OpCounters replOpCounters; NetworkCounter networkCounter; AuthCounter authCounter; +AggStageCounters aggStageCounters; } // namespace mongo diff --git a/src/mongo/db/stats/counters.h b/src/mongo/db/stats/counters.h index edc17eee2bd..36037052cc4 100644 --- a/src/mongo/db/stats/counters.h +++ b/src/mongo/db/stats/counters.h @@ -31,12 +31,14 @@ #include <map> +#include "mongo/db/commands/server_status_metric.h" #include "mongo/db/jsobj.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/basic.h" #include "mongo/rpc/message.h" #include "mongo/util/concurrency/spin_lock.h" #include "mongo/util/processinfo.h" +#include "mongo/util/string_map.h" #include "mongo/util/with_alignment.h" namespace mongo { @@ -192,4 +194,20 @@ private: MechanismMap _mechanisms; }; extern AuthCounter authCounter; + +class AggStageCounters { +public: + // Container for a stage count metric along with its corresponding counter. + struct StageCounter { + StageCounter(StringData name) : metric("aggStageCounters." + name, &counter) {} + + Counter64 counter; + ServerStatusMetricField<Counter64> metric; + }; + + // Map of aggregation stages to the number of occurrences. + StringMap<std::unique_ptr<StageCounter>> stageCounterMap = {}; +}; + +extern AggStageCounters aggStageCounters; } // namespace mongo diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 29ed31f4393..a2f335ee270 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -40,6 +40,7 @@ #include "mongo/db/curop.h" #include "mongo/db/lasterror.h" #include "mongo/db/logical_clock.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/stats/counters.h" #include "mongo/db/storage/duplicate_key_error_info.h" #include "mongo/executor/task_executor_pool.h" @@ -507,6 +508,15 @@ private: debug.additiveMetrics.nMatched = response.getN() - (debug.upsert ? response.sizeUpsertDetails() : 0); debug.additiveMetrics.nModified = response.getNModified(); + for (auto&& update : _batchedRequest.getUpdateRequest().getUpdates()) { + // If this was a pipeline style update, record which stages were being used. + auto updateMod = update.getU(); + if (updateMod.type() == write_ops::UpdateModification::Type::kPipeline) { + auto pipeline = LiteParsedPipeline(_batchedRequest.getNS(), + updateMod.getUpdatePipeline()); + pipeline.tickGlobalStageCounters(); + } + } break; case BatchedCommandRequest::BatchType_Delete: for (size_t i = 0; i < numAttempts; ++i) { diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index b19183096e6..61c3d034f9b 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -345,9 +345,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, MONGO_UNREACHABLE; }(); - if (status.isOK()) + if (status.isOK()) { updateHostsTargetedMetrics(opCtx, namespaces.executionNss, routingInfo, involvedNamespaces); - + // Report usage statistics for each stage in the pipeline. + liteParsedPipeline.tickGlobalStageCounters(); + } return status; } |