summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/server_status_aggregation_stage_counter.js153
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp4
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp9
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h5
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h5
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h6
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h5
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h5
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h5
-rw-r--r--src/mongo/db/pipeline/document_source_list_cached_and_active_users.h5
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h4
-rw-r--r--src/mongo/db/pipeline/document_source_list_sessions.h4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h7
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h16
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_plan_cache_stats.h5
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.h7
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.cpp30
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h42
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.cpp14
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h5
-rw-r--r--src/mongo/db/stats/counters.cpp1
-rw-r--r--src/mongo/db/stats/counters.h18
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp10
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp6
32 files changed, 350 insertions, 59 deletions
diff --git a/jstests/noPassthrough/server_status_aggregation_stage_counter.js b/jstests/noPassthrough/server_status_aggregation_stage_counter.js
new file mode 100644
index 00000000000..ead660a7cfd
--- /dev/null
+++ b/jstests/noPassthrough/server_status_aggregation_stage_counter.js
@@ -0,0 +1,153 @@
+/**
+ * Tests for serverStatus metrics.stage stats.
+ * @tags: [requires_sharding]
+ */
+(function() {
+"use strict";
+
+// In memory map of stage names to their counters. Used to verify that serverStatus is
+// incrementing the appropriate stages correctly across multiple pipelines.
+let countersWeExpectToIncreaseMap = {};
+
+function checkCounters(command, countersWeExpectToIncrease, countersWeExpectNotToIncrease = []) {
+ // Capture the pre-aggregation counts of the stages which we expect not to increase.
+ let metrics = db.serverStatus().metrics.aggStageCounters;
+ let noIncreaseCounterMap = {};
+ for (const stage of countersWeExpectNotToIncrease) {
+ if (!countersWeExpectToIncreaseMap[stage]) {
+ countersWeExpectToIncreaseMap[stage] = 0;
+ }
+ noIncreaseCounterMap[stage] = metrics[stage];
+ }
+
+ // Update in memory map to reflect what each counter's count should be after running 'command'.
+ for (const stage of countersWeExpectToIncrease) {
+ if (!countersWeExpectToIncreaseMap[stage]) {
+ countersWeExpectToIncreaseMap[stage] = 0;
+ }
+ countersWeExpectToIncreaseMap[stage]++;
+ }
+
+ // Run the command and update metrics to reflect the post-command serverStatus state.
+ command();
+ metrics = db.serverStatus().metrics.aggStageCounters;
+
+ // Verify that serverStatus reflects expected counters.
+ for (const stage of countersWeExpectToIncrease) {
+ assert.eq(metrics[stage], countersWeExpectToIncreaseMap[stage]);
+ }
+
+ // Verify that the counters which we expect not to increase did not do so.
+ for (const stage of countersWeExpectNotToIncrease) {
+ assert.eq(metrics[stage], noIncreaseCounterMap[stage]);
+ }
+}
+
+function runTests(db, coll) {
+ // Reset our counter map before running any aggregations.
+ countersWeExpectToIncreaseMap = {};
+
+ // Setup for agg stages which have nested pipelines.
+ assert.commandWorked(db[coll].insert([
+ {"_id": 1, "item": "almonds", "price": 12, "quantity": 2},
+ {"_id": 2, "item": "pecans", "price": 20, "quantity": 1},
+ {"_id": 3}
+ ]));
+
+ assert.commandWorked(db.inventory.insert([
+ {"_id": 1, "sku": "almonds", description: "product 1", "instock": 120},
+ {"_id": 2, "sku": "bread", description: "product 2", "instock": 80},
+ {"_id": 3, "sku": "cashews", description: "product 3", "instock": 60},
+ {"_id": 4, "sku": "pecans", description: "product 4", "instock": 70},
+ {"_id": 5, "sku": null, description: "Incomplete"},
+ {"_id": 6}
+ ]));
+
+ // $skip
+ checkCounters(() => coll.aggregate([{$skip: 5}]).toArray(), ['$skip']);
+ // $project is an alias for $unset.
+ checkCounters(() => coll.aggregate([{$project: {title: 1, author: 1}}]).toArray(),
+ ['$project'],
+ ['$unset']);
+ // $count is an alias for $project and $group.
+ checkCounters(
+ () => coll.aggregate([{$count: "test"}]).toArray(), ['$count'], ['$project', '$group']);
+
+ // $lookup
+ checkCounters(
+ () => coll.aggregate([{$lookup: {from: "inventory", pipeline: [{$match: {inStock: 70}}], as: "inventory_docs"}}]).toArray(),
+ ['$lookup', "$match"]);
+
+ // $merge
+ checkCounters(
+ () => coll.aggregate([{
+ $merge:
+ {into: coll.getName(), whenMatched: [{$set: {a: {$multiply: ["$a", 2]}}}]}
+ }])
+ .toArray(),
+ ['$merge', "$set"]);
+
+ // $facet
+ checkCounters(
+ () =>
+ coll.aggregate([{
+ $facet: {"a": [{$match: {price: {$exists: 1}}}], "b": [{$project: {title: 1}}]}
+ }])
+ .toArray(),
+ ['$facet', '$match', "$project"]);
+
+ // Verify that explain ticks counters.
+ checkCounters(() => coll.explain().aggregate([{$match: {a: 5}}]), ["$match"]);
+
+ // Verify that a pipeline in an update ticks counters.
+ checkCounters(() => coll.update({_id: 5}, [{$addFields: {a: {$add: ['$a', 1]}}}]),
+ ["$addFields"],
+ ["$set"]);
+
+ // Verify that a stage which appears multiple times in a pipeline has an accurate count.
+ checkCounters(
+ () =>
+ coll.aggregate([
+ {
+ $facet:
+ {"a": [{$match: {price: {$exists: 1}}}], "b": [{$project: {title: 1}}]}
+ },
+ {
+ $facet: {
+ "c": [{$match: {instock: {$exists: 1}}}],
+ "d": [{$project: {title: 0}}]
+ }
+ }
+ ])
+ .toArray(),
+ ["$facet", "$match", "$project", "$facet", "$match", "$project"]);
+
+ // Verify that a pipeline used in a view ticks counters.
+ const viewName = "counterView";
+ assert.commandWorked(db.createView(viewName, coll.getName(), [{"$project": {_id: 0}}]));
+ // Note that $project's counter will also be ticked since the $project used to generate the view
+ // will be stitched together with the pipeline specified to the aggregate command.
+ checkCounters(() => db[viewName].aggregate([{$match: {a: 5}}]).toArray(),
+ ["$match", "$project"]);
+}
+
+// Standalone
+const conn = MongoRunner.runMongod();
+assert.neq(null, conn, "mongod was unable to start up");
+let db = conn.getDB(jsTest.name());
+const collName = jsTest.name();
+let coll = db[collName];
+runTests(db, coll);
+
+MongoRunner.stopMongod(conn);
+
+// Sharded cluster
+const st = new ShardingTest({shards: 2});
+db = st.s.getDB(jsTest.name());
+coll = db[collName];
+st.shardColl(coll.getFullName(), {_id: 1}, {_id: "hashed"});
+
+runTests(db, coll);
+
+st.stop();
+}());
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index ac57422ea33..369106d769b 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -52,7 +52,6 @@
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
@@ -767,6 +766,9 @@ Status runAggregate(OperationContext* opCtx,
pins.emplace_back(std::move(pin));
}
+ // Report usage statistics for each stage in the pipeline.
+ liteParsedPipeline.tickGlobalStageCounters();
+
// If both explain and cursor are specified, explain wins.
if (expCtx->explain) {
auto explainExecutor = pins[0].getCursor()->getExecutor();
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 269da4407d1..fd5a9c52c55 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -59,6 +59,7 @@
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/query/collection_query_info.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
@@ -841,6 +842,14 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
if (!canContinue)
break;
}
+
+ // If this was a pipeline style update, record which stages were being used.
+ auto updateMod = singleOp.getU();
+ if (updateMod.type() == write_ops::UpdateModification::Type::kPipeline) {
+ auto pipeline =
+ LiteParsedPipeline(wholeOp.getNamespace(), updateMod.getUpdatePipeline());
+ pipeline.tickGlobalStageCounters();
+ }
}
return out;
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 5ad3adefc5b..ecd9b51397c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -184,6 +184,7 @@ env.Library(
'lite_parsed_pipeline.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/stats/counters',
'aggregation_request',
]
)
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index b7bf369af30..eb39e8f90ea 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -51,10 +51,11 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec) {
- return std::make_unique<LiteParsed>(nss);
+ return std::make_unique<LiteParsed>(spec.fieldName(), nss);
}
- explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {}
+ explicit LiteParsed(std::string parseTimeName, NamespaceString nss)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {}
bool isChangeStream() const final {
return true;
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 60401817729..6e97cf562c6 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -45,10 +45,11 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec) {
- return std::make_unique<LiteParsed>(nss);
+ return std::make_unique<LiteParsed>(spec.fieldName(), nss);
}
- explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {}
+ explicit LiteParsed(std::string parseTimeName, NamespaceString nss)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {}
bool isCollStats() const final {
return true;
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
index 85ca3cf8835..e0d40c619c0 100644
--- a/src/mongo/db/pipeline/document_source_current_op.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -100,7 +100,8 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li
}
}
- return std::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers, localOps);
+ return std::make_unique<DocumentSourceCurrentOp::LiteParsed>(
+ spec.fieldName(), allUsers, localOps);
}
const char* DocumentSourceCurrentOp::getSourceName() const {
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index 37abba0e53a..ed2093c449b 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -50,8 +50,10 @@ public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec);
- LiteParsed(UserMode allUsers, LocalOpsMode localOps)
- : _allUsers(allUsers), _localOps(localOps) {}
+ LiteParsed(std::string parseTimeName, UserMode allUsers, LocalOpsMode localOps)
+ : LiteParsedDocumentSource(std::move(parseTimeName)),
+ _allUsers(allUsers),
+ _localOps(localOps) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index fcbfc566c0c..9414a87ad5c 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -114,7 +114,8 @@ std::unique_ptr<DocumentSourceFacet::LiteParsed> DocumentSourceFacet::LiteParsed
liteParsedPipelines.emplace_back(LiteParsedPipeline(nss, rawPipeline.second));
}
- return std::make_unique<DocumentSourceFacet::LiteParsed>(std::move(liteParsedPipelines));
+ return std::make_unique<DocumentSourceFacet::LiteParsed>(spec.fieldName(),
+ std::move(liteParsedPipelines));
}
REGISTER_DOCUMENT_SOURCE(facet,
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index b83d34597eb..a78a89dab81 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -72,8 +72,9 @@ public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec);
- LiteParsed(std::vector<LiteParsedPipeline> pipelines)
- : LiteParsedDocumentSourceNestedPipelines(boost::none, std::move(pipelines)) {}
+ LiteParsed(std::string parseTimeName, std::vector<LiteParsedPipeline> pipelines)
+ : LiteParsedDocumentSourceNestedPipelines(
+ std::move(parseTimeName), boost::none, std::move(pipelines)) {}
PrivilegeVector requiredPrivileges(bool isMongos,
bool bypassDocumentValidation) const override final {
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index 7b76bfc9d26..b714ed53c9a 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -85,7 +85,7 @@ std::unique_ptr<DocumentSourceGraphLookUp::LiteParsed> DocumentSourceGraphLookUp
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "invalid $graphLookup namespace: " << fromNss.ns(),
fromNss.isValid());
- return std::make_unique<LiteParsed>(std::move(fromNss));
+ return std::make_unique<LiteParsed>(spec.fieldName(), std::move(fromNss));
}
REGISTER_DOCUMENT_SOURCE(graphLookup,
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 979c1dde866..f9124816b08 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -43,8 +43,9 @@ public:
class LiteParsed : public LiteParsedDocumentSourceForeignCollection {
public:
- LiteParsed(NamespaceString foreignNss)
- : LiteParsedDocumentSourceForeignCollection(std::move(foreignNss)) {}
+ LiteParsed(std::string parseTimeName, NamespaceString foreignNss)
+ : LiteParsedDocumentSourceForeignCollection(std::move(parseTimeName),
+ std::move(foreignNss)) {}
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec);
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index f43bff49df6..f70a3d48388 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -46,10 +46,11 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec) {
- return std::make_unique<LiteParsed>(nss);
+ return std::make_unique<LiteParsed>(spec.fieldName(), nss);
}
- explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {}
+ explicit LiteParsed(std::string parseTimeName, NamespaceString nss)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
index 40af292031d..35ab6dfe54c 100644
--- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
+++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
@@ -49,9 +49,12 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec) {
- return std::make_unique<LiteParsed>();
+ return std::make_unique<LiteParsed>(spec.fieldName());
}
+ explicit LiteParsed(std::string parseTimeName)
+ : LiteParsedDocumentSource(std::move(parseTimeName)) {}
+
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
}
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index dbe545c3096..4615296a0ed 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -60,10 +60,12 @@ public:
const BSONElement& spec) {
return std::make_unique<LiteParsed>(
+ spec.fieldName(),
listSessionsParseSpec(DocumentSourceListLocalSessions::kStageName, spec));
}
- explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {}
+ explicit LiteParsed(std::string parseTimeName, const ListSessionsSpec& spec)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _spec(spec) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h
index ed8dcdf177d..b8287a843a8 100644
--- a/src/mongo/db/pipeline/document_source_list_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_sessions.h
@@ -64,10 +64,12 @@ public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec) {
return std::make_unique<LiteParsed>(
+ spec.fieldName(),
listSessionsParseSpec(DocumentSourceListSessions::kStageName, spec));
}
- explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {}
+ explicit LiteParsed(std::string parseTimeName, const ListSessionsSpec& spec)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _spec(spec) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 4e30b9fc5dc..ddbe6ac0e59 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -138,8 +138,8 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars
liteParsedPipeline = LiteParsedPipeline(fromNss, pipeline);
}
- return std::make_unique<DocumentSourceLookUp::LiteParsed>(std::move(fromNss),
- std::move(liteParsedPipeline));
+ return std::make_unique<DocumentSourceLookUp::LiteParsed>(
+ spec.fieldName(), std::move(fromNss), std::move(liteParsedPipeline));
}
PrivilegeVector DocumentSourceLookUp::LiteParsed::requiredPrivileges(
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 7138db9748f..4917ea49191 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -64,8 +64,11 @@ public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec);
- LiteParsed(NamespaceString foreignNss, boost::optional<LiteParsedPipeline> pipeline)
- : LiteParsedDocumentSourceNestedPipelines(std::move(foreignNss), std::move(pipeline)) {}
+ LiteParsed(std::string parseTimeName,
+ NamespaceString foreignNss,
+ boost::optional<LiteParsedPipeline> pipeline)
+ : LiteParsedDocumentSourceNestedPipelines(
+ std::move(parseTimeName), std::move(foreignNss), std::move(pipeline)) {}
/**
* Lookup from a sharded collection may not be allowed.
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index e953e813bc0..fbcb54950b8 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -329,19 +329,28 @@ std::unique_ptr<DocumentSourceMerge::LiteParsed> DocumentSourceMerge::LiteParsed
MergeWhenMatchedMode_serializer(whenMatched),
MergeWhenNotMatchedMode_serializer(whenNotMatched)),
isSupportedMergeMode(whenMatched, whenNotMatched));
-
- return std::make_unique<DocumentSourceMerge::LiteParsed>(
- std::move(targetNss), whenMatched, whenNotMatched);
+ boost::optional<LiteParsedPipeline> liteParsedPipeline;
+ if (whenMatched == MergeWhenMatchedModeEnum::kPipeline) {
+ auto pipeline = mergeSpec.getWhenMatched()->pipeline;
+ invariant(pipeline);
+ liteParsedPipeline = LiteParsedPipeline(nss, *pipeline);
+ }
+ return std::make_unique<DocumentSourceMerge::LiteParsed>(spec.fieldName(),
+ std::move(targetNss),
+ whenMatched,
+ whenNotMatched,
+ std::move(liteParsedPipeline));
}
PrivilegeVector DocumentSourceMerge::LiteParsed::requiredPrivileges(
bool isMongos, bool bypassDocumentValidation) const {
+ invariant(_foreignNss);
auto actions = ActionSet{getDescriptors().at({_whenMatched, _whenNotMatched}).actions};
if (bypassDocumentValidation) {
actions.addAction(ActionType::bypassDocumentValidation);
}
- return {{ResourcePattern::forExactNamespace(_foreignNss), actions}};
+ return {{ResourcePattern::forExactNamespace(*_foreignNss), actions}};
}
DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs,
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index 808836e0d77..464b4cfdaa0 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/document_source_merge_gen.h"
#include "mongo/db/pipeline/document_source_writer.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
namespace mongo {
@@ -68,21 +69,22 @@ public:
* collection is unsharded. This ensures that the unique index verification happens once on
* mongos and can be bypassed on the shards.
*/
- class LiteParsed final : public LiteParsedDocumentSourceForeignCollection {
+ class LiteParsed final : public LiteParsedDocumentSourceNestedPipelines {
public:
- LiteParsed(NamespaceString foreignNss,
+ LiteParsed(std::string parseTimeName,
+ NamespaceString foreignNss,
MergeWhenMatchedModeEnum whenMatched,
- MergeWhenNotMatchedModeEnum whenNotMatched)
- : LiteParsedDocumentSourceForeignCollection(std::move(foreignNss)),
+ MergeWhenNotMatchedModeEnum whenNotMatched,
+ boost::optional<LiteParsedPipeline> onMatchedPipeline)
+ : LiteParsedDocumentSourceNestedPipelines(
+ std::move(parseTimeName), std::move(foreignNss), std::move(onMatchedPipeline)),
_whenMatched(whenMatched),
_whenNotMatched(whenNotMatched) {}
- using LiteParsedDocumentSourceForeignCollection::LiteParsedDocumentSourceForeignCollection;
-
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec);
- bool allowedToPassthroughFromMongos() const final {
+ bool allowedToPassthroughFromMongos() const {
return false;
}
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 93122dac504..3cde388224b 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -91,7 +91,7 @@ std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::pa
"Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()),
targetNss.isValid());
- return std::make_unique<DocumentSourceOut::LiteParsed>(std::move(targetNss));
+ return std::make_unique<DocumentSourceOut::LiteParsed>(spec.fieldName(), std::move(targetNss));
}
std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parse(
@@ -106,7 +106,7 @@ std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::pa
"Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()),
targetNss.isValid());
- return std::make_unique<DocumentSourceOut::LiteParsed>(std::move(targetNss));
+ return std::make_unique<DocumentSourceOut::LiteParsed>(spec.fieldName(), std::move(targetNss));
}
void DocumentSourceOut::initialize() {
diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
index d706f4f3ba4..06aa28d908f 100644
--- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h
+++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
@@ -42,10 +42,11 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec) {
- return std::make_unique<LiteParsed>(nss);
+ return std::make_unique<LiteParsed>(spec.fieldName(), nss);
}
- explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {}
+ explicit LiteParsed(std::string parseTimeName, NamespaceString nss)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const override {
// There are no foreign collections.
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index 838923e3392..151ea99c58a 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -92,8 +92,8 @@ std::unique_ptr<DocumentSourceUnionWith::LiteParsed> DocumentSourceUnionWith::Li
}
}
- return std::make_unique<DocumentSourceUnionWith::LiteParsed>(std::move(unionNss),
- std::move(liteParsedPipeline));
+ return std::make_unique<DocumentSourceUnionWith::LiteParsed>(
+ spec.fieldName(), std::move(unionNss), std::move(liteParsedPipeline));
}
PrivilegeVector DocumentSourceUnionWith::LiteParsed::requiredPrivileges(
diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h
index 69f1fbaf073..3225a1e6468 100644
--- a/src/mongo/db/pipeline/document_source_union_with.h
+++ b/src/mongo/db/pipeline/document_source_union_with.h
@@ -50,8 +50,11 @@ public:
static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
const BSONElement& spec);
- LiteParsed(NamespaceString foreignNss, boost::optional<LiteParsedPipeline> pipeline)
- : LiteParsedDocumentSourceNestedPipelines(std::move(foreignNss), std::move(pipeline)) {}
+ LiteParsed(std::string parseTimeName,
+ NamespaceString foreignNss,
+ boost::optional<LiteParsedPipeline> pipeline)
+ : LiteParsedDocumentSourceNestedPipelines(
+ std::move(parseTimeName), std::move(foreignNss), std::move(pipeline)) {}
PrivilegeVector requiredPrivileges(bool isMongos,
bool bypassDocumentValidation) const override final;
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.cpp b/src/mongo/db/pipeline/lite_parsed_document_source.cpp
index 32c76a8d37e..a21064b8387 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.cpp
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.cpp
@@ -30,20 +30,26 @@
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
-
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
-#include "mongo/util/string_map.h"
+#include "mongo/db/stats/counters.h"
namespace mongo {
using Parser = LiteParsedDocumentSource::Parser;
namespace {
+
+// Empty vector used by LiteParsedDocumentSources which do not have a sub pipeline.
+inline static std::vector<LiteParsedPipeline> kNoSubPipeline = {};
+
StringMap<Parser> parserMap;
+
} // namespace
void LiteParsedDocumentSource::registerParser(const std::string& name, Parser parser) {
parserMap[name] = parser;
+ // Initialize a counter for this document source to track how many times it is used.
+ aggStageCounters.stageCounterMap[name] = std::make_unique<AggStageCounters::StageCounter>(name);
}
std::unique_ptr<LiteParsedDocumentSource> LiteParsedDocumentSource::parse(
@@ -63,14 +69,24 @@ std::unique_ptr<LiteParsedDocumentSource> LiteParsedDocumentSource::parse(
return it->second(nss, specElem);
}
+const std::vector<LiteParsedPipeline>& LiteParsedDocumentSource::getSubPipelines() const {
+ return kNoSubPipeline;
+}
+
LiteParsedDocumentSourceNestedPipelines::LiteParsedDocumentSourceNestedPipelines(
- boost::optional<NamespaceString> foreignNss, std::vector<LiteParsedPipeline> pipelines)
- : _foreignNss(std::move(foreignNss)), _pipelines(std::move(pipelines)) {}
+ std::string parseTimeName,
+ boost::optional<NamespaceString> foreignNss,
+ std::vector<LiteParsedPipeline> pipelines)
+ : LiteParsedDocumentSource(std::move(parseTimeName)),
+ _foreignNss(std::move(foreignNss)),
+ _pipelines(std::move(pipelines)) {}
LiteParsedDocumentSourceNestedPipelines::LiteParsedDocumentSourceNestedPipelines(
- boost::optional<NamespaceString> foreignNss, boost::optional<LiteParsedPipeline> pipeline)
- : LiteParsedDocumentSourceNestedPipelines(std::move(foreignNss),
- std::vector<LiteParsedPipeline>{}) {
+ std::string parseTimeName,
+ boost::optional<NamespaceString> foreignNss,
+ boost::optional<LiteParsedPipeline> pipeline)
+ : LiteParsedDocumentSourceNestedPipelines(
+ std::move(parseTimeName), std::move(foreignNss), std::vector<LiteParsedPipeline>{}) {
if (pipeline)
_pipelines.emplace_back(std::move(pipeline.get()));
}
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index bdeb7f7f554..95c28f49510 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -35,6 +35,7 @@
#include <vector>
#include "mongo/db/auth/privilege.h"
+#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/read_concern_support_result.h"
#include "mongo/db/repl/read_concern_args.h"
@@ -52,6 +53,9 @@ class LiteParsedPipeline;
*/
class LiteParsedDocumentSource {
public:
+ LiteParsedDocumentSource(std::string parseTimeName)
+ : _parseTimeName(std::move(parseTimeName)) {}
+
virtual ~LiteParsedDocumentSource() = default;
/*
@@ -64,7 +68,6 @@ public:
*/
using Parser = std::function<std::unique_ptr<LiteParsedDocumentSource>(const NamespaceString&,
const BSONElement&)>;
-
/**
* Registers a DocumentSource with a spec parsing function, so that when a stage with the given
* name is encountered, it will call 'parser' to construct that stage's specification object.
@@ -146,6 +149,19 @@ public:
*/
virtual void assertSupportsMultiDocumentTransaction() const {}
+ /**
+ * Returns this document source's subpipelines. If none exist, a reference to an empty vector
+ * is returned.
+ */
+ virtual const std::vector<LiteParsedPipeline>& getSubPipelines() const;
+
+ /**
+ * Returns the name of the stage that this LiteParsedDocumentSource represents.
+ */
+ const std::string& getParseTimeName() const {
+ return _parseTimeName;
+ }
+
protected:
void transactionNotSupported(StringData stageName) const {
uasserted(ErrorCodes::OperationNotSupportedInTransaction,
@@ -175,6 +191,9 @@ protected:
return onlySingleReadConcernSupported(
stageName, repl::ReadConcernLevel::kLocalReadConcern, level);
}
+
+private:
+ std::string _parseTimeName;
};
class LiteParsedDocumentSourceDefault final : public LiteParsedDocumentSource {
@@ -186,10 +205,11 @@ public:
*/
static std::unique_ptr<LiteParsedDocumentSourceDefault> parse(const NamespaceString& nss,
const BSONElement& spec) {
- return std::make_unique<LiteParsedDocumentSourceDefault>();
+ return std::make_unique<LiteParsedDocumentSourceDefault>(spec.fieldName());
}
- LiteParsedDocumentSourceDefault() = default;
+ LiteParsedDocumentSourceDefault(std::string parseTimeName)
+ : LiteParsedDocumentSource(std::move(parseTimeName)) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
@@ -205,8 +225,8 @@ public:
*/
class LiteParsedDocumentSourceForeignCollection : public LiteParsedDocumentSource {
public:
- LiteParsedDocumentSourceForeignCollection(NamespaceString foreignNss)
- : _foreignNss(std::move(foreignNss)) {}
+ LiteParsedDocumentSourceForeignCollection(std::string parseTimeName, NamespaceString foreignNss)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _foreignNss(std::move(foreignNss)) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return {_foreignNss};
@@ -224,18 +244,24 @@ protected:
*/
class LiteParsedDocumentSourceNestedPipelines : public LiteParsedDocumentSource {
public:
- LiteParsedDocumentSourceNestedPipelines(boost::optional<NamespaceString> foreignNss,
+ LiteParsedDocumentSourceNestedPipelines(std::string parseTimeName,
+ boost::optional<NamespaceString> foreignNss,
std::vector<LiteParsedPipeline> pipelines);
- LiteParsedDocumentSourceNestedPipelines(boost::optional<NamespaceString> foreignNss,
+ LiteParsedDocumentSourceNestedPipelines(std::string parseTimeName,
+ boost::optional<NamespaceString> foreignNss,
boost::optional<LiteParsedPipeline> pipeline);
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final override;
- bool allowedToPassthroughFromMongos() const final override;
+ bool allowedToPassthroughFromMongos() const override;
bool allowShardedForeignCollection(NamespaceString nss) const override;
+ const std::vector<LiteParsedPipeline>& getSubPipelines() const override {
+ return _pipelines;
+ }
+
protected:
boost::optional<NamespaceString> _foreignNss;
std::vector<LiteParsedPipeline> _pipelines;
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
index 954239fa928..153912ef448 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/stats/counters.h"
namespace mongo {
@@ -110,4 +111,17 @@ void LiteParsedPipeline::verifyIsSupported(
}
}
+void LiteParsedPipeline::tickGlobalStageCounters() const {
+ for (auto&& stage : _stageSpecs) {
+ // Tick counter corresponding to current stage.
+ aggStageCounters.stageCounterMap.find(stage->getParseTimeName())
+ ->second->counter.increment(1);
+
+ // Recursively step through any sub-pipelines.
+ for (auto&& subPipeline : stage->getSubPipelines()) {
+ subPipeline.tickGlobalStageCounters();
+ }
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index 7d6ef9d7355..d80a470c9b0 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -157,6 +157,11 @@ public:
return !_stageSpecs.empty() && _stageSpecs.front()->isInitialSource();
}
+ /**
+ * Increments global stage counters corresponding to the stages in this lite parsed pipeline.
+ */
+ void tickGlobalStageCounters() const;
+
private:
std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs;
};
diff --git a/src/mongo/db/stats/counters.cpp b/src/mongo/db/stats/counters.cpp
index 0e5d0595dcd..f4ee4c71676 100644
--- a/src/mongo/db/stats/counters.cpp
+++ b/src/mongo/db/stats/counters.cpp
@@ -231,4 +231,5 @@ OpCounters globalOpCounters;
OpCounters replOpCounters;
NetworkCounter networkCounter;
AuthCounter authCounter;
+AggStageCounters aggStageCounters;
} // namespace mongo
diff --git a/src/mongo/db/stats/counters.h b/src/mongo/db/stats/counters.h
index edc17eee2bd..36037052cc4 100644
--- a/src/mongo/db/stats/counters.h
+++ b/src/mongo/db/stats/counters.h
@@ -31,12 +31,14 @@
#include <map>
+#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/jsobj.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/basic.h"
#include "mongo/rpc/message.h"
#include "mongo/util/concurrency/spin_lock.h"
#include "mongo/util/processinfo.h"
+#include "mongo/util/string_map.h"
#include "mongo/util/with_alignment.h"
namespace mongo {
@@ -192,4 +194,20 @@ private:
MechanismMap _mechanisms;
};
extern AuthCounter authCounter;
+
+class AggStageCounters {
+public:
+ // Container for a stage count metric along with its corresponding counter.
+ struct StageCounter {
+ StageCounter(StringData name) : metric("aggStageCounters." + name, &counter) {}
+
+ Counter64 counter;
+ ServerStatusMetricField<Counter64> metric;
+ };
+
+ // Map of aggregation stages to the number of occurrences.
+ StringMap<std::unique_ptr<StageCounter>> stageCounterMap = {};
+};
+
+extern AggStageCounters aggStageCounters;
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 29ed31f4393..a2f335ee270 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/logical_clock.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/executor/task_executor_pool.h"
@@ -507,6 +508,15 @@ private:
debug.additiveMetrics.nMatched =
response.getN() - (debug.upsert ? response.sizeUpsertDetails() : 0);
debug.additiveMetrics.nModified = response.getNModified();
+ for (auto&& update : _batchedRequest.getUpdateRequest().getUpdates()) {
+ // If this was a pipeline style update, record which stages were being used.
+ auto updateMod = update.getU();
+ if (updateMod.type() == write_ops::UpdateModification::Type::kPipeline) {
+ auto pipeline = LiteParsedPipeline(_batchedRequest.getNS(),
+ updateMod.getUpdatePipeline());
+ pipeline.tickGlobalStageCounters();
+ }
+ }
break;
case BatchedCommandRequest::BatchType_Delete:
for (size_t i = 0; i < numAttempts; ++i) {
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index b19183096e6..61c3d034f9b 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -345,9 +345,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
MONGO_UNREACHABLE;
}();
- if (status.isOK())
+ if (status.isOK()) {
updateHostsTargetedMetrics(opCtx, namespaces.executionNss, routingInfo, involvedNamespaces);
-
+ // Report usage statistics for each stage in the pipeline.
+ liteParsedPipeline.tickGlobalStageCounters();
+ }
return status;
}