summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2020-01-29 15:13:38 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-28 19:06:47 +0000
commit3e49ba9571c3513f7aef3135b5923c83a76344e2 (patch)
tree7e9aefa30ada56913351d4a11aac07190988e2f5
parente3ee1df650f6a38bca6691c7b9cba1c5fbc56a11 (diff)
downloadmongo-3e49ba9571c3513f7aef3135b5923c83a76344e2.tar.gz
SERVER-44689 Add serverStatus counter for each use of an aggregation stage in a user's request
(cherry picked from commit c54a777a4a154984f5595b11993d7d009350a38c) (cherry picked from commit 08266fc2ad15e2cba4af79e58a83e822e7c540dc)
-rw-r--r--jstests/noPassthrough/server_status_aggregation_stage_counter.js144
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp5
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h6
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h5
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h6
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h11
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h5
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_cursors.h5
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h7
-rw-r--r--src/mongo/db/pipeline/document_source_list_sessions.h6
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h21
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp4
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.cpp17
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h41
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h18
-rw-r--r--src/mongo/db/stats/counters.cpp3
-rw-r--r--src/mongo/db/stats/counters.h21
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp10
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp8
24 files changed, 323 insertions, 49 deletions
diff --git a/jstests/noPassthrough/server_status_aggregation_stage_counter.js b/jstests/noPassthrough/server_status_aggregation_stage_counter.js
new file mode 100644
index 00000000000..95069f30c0f
--- /dev/null
+++ b/jstests/noPassthrough/server_status_aggregation_stage_counter.js
@@ -0,0 +1,144 @@
+/**
+ * Tests for serverStatus metrics.stage stats.
+ * @tags: [requires_sharding]
+ */
+(function() {
+ "use strict";
+
+ // In memory map of stage names to their counters. Used to verify that serverStatus is
+ // incrementing the appropriate stages correctly across multiple pipelines.
+ let countersWeExpectToIncreaseMap = {};
+
+ function checkCounters(
+ command, countersWeExpectToIncrease, countersWeExpectNotToIncrease = []) {
+ // Capture the pre-aggregation counts of the stages which we expect not to increase.
+ let metrics = db.serverStatus().metrics.aggStageCounters;
+ let noIncreaseCounterMap = {};
+ for (let stage of countersWeExpectNotToIncrease) {
+ noIncreaseCounterMap[stage] = metrics[stage];
+ }
+
+ // Update in memory map to reflect what each counter's count should be after running
+ // 'command'.
+ for (let stage of countersWeExpectToIncrease) {
+ if (!countersWeExpectToIncreaseMap[stage]) {
+ countersWeExpectToIncreaseMap[stage] = 0;
+ }
+ countersWeExpectToIncreaseMap[stage]++;
+ }
+
+ // Run the command and update metrics to reflect the post-command serverStatus state.
+ command();
+ metrics = db.serverStatus().metrics.aggStageCounters;
+ // Verify that serverStatus reflects expected counters.
+ for (let stage of countersWeExpectToIncrease) {
+ assert.eq(metrics[stage], countersWeExpectToIncreaseMap[stage]);
+ }
+
+ // Verify that the counters which we expect not to increase did not do so.
+ for (let stage of countersWeExpectNotToIncrease) {
+ assert.eq(metrics[stage], noIncreaseCounterMap[stage]);
+ }
+ }
+
+ function runTests(db, coll) {
+ // Reset our counter map before running any aggregations.
+ countersWeExpectToIncreaseMap = {};
+
+ // Setup for agg stages which have nested pipelines.
+ assert.commandWorked(coll.insert([
+ {"_id": 1, "item": "almonds", "price": 12, "quantity": 2},
+ {"_id": 2, "item": "pecans", "price": 20, "quantity": 1},
+ {"_id": 3}
+ ]));
+
+ assert.commandWorked(db.inventory.insert([
+ {"_id": 1, "sku": "almonds", description: "product 1", "instock": 120},
+ {"_id": 2, "sku": "bread", description: "product 2", "instock": 80},
+ {"_id": 3, "sku": "cashews", description: "product 3", "instock": 60},
+ {"_id": 4, "sku": "pecans", description: "product 4", "instock": 70},
+ {"_id": 5, "sku": null, description: "Incomplete"},
+ {"_id": 6}
+ ]));
+
+ // $skip
+ checkCounters(() => coll.aggregate([{$skip: 5}]).toArray(), ['$skip']);
+ // $project is an alias for $unset.
+ checkCounters(() => coll.aggregate([{$project: {title: 1, author: 1}}]).toArray(),
+ ['$project'],
+ ['$unset']);
+ // $count is an alias for $project and $group.
+ checkCounters(
+ () => coll.aggregate([{$count: "test"}]).toArray(), ['$count'], ['$project', '$group']);
+
+ // $lookup
+ checkCounters(() => coll.aggregate([{
+ $lookup: {
+ from: "inventory",
+ pipeline: [{$match: {inStock: 70}}],
+ as: "inventory_docs"
+ }
+ }])
+ .toArray(),
+ ['$lookup', "$match"]);
+
+ // $facet
+ checkCounters(() => coll.aggregate([{
+ $facet: {
+ "a": [{$match: {price: {$exists: 1}}}],
+ "b": [{$project: {title: 1}}]
+ }
+ }])
+ .toArray(),
+ ['$facet', '$match', "$project"]);
+
+ // Verify that explain ticks counters.
+ checkCounters(() => coll.explain().aggregate([{$match: {a: 5}}]), ["$match"]);
+
+ // Verify that a stage which appears multiple times in a pipeline has an accurate count.
+ checkCounters(() => coll.aggregate([
+ {
+ $facet: {
+ "a": [{$match: {price: {$exists: 1}}}],
+ "b": [{$project: {title: 1}}]
+ }
+ },
+ {
+ $facet: {
+ "c": [{$match: {instock: {$exists: 1}}}],
+ "d": [{$project: {title: 0}}]
+ }
+ }
+ ])
+ .toArray(),
+ ["$facet", "$match", "$project", "$facet", "$match", "$project"]);
+
+ // Verify that a pipeline used in a view ticks counters.
+ const viewName = "counterView";
+ assert.commandWorked(db.createView(viewName, coll.getName(), [{"$project": {_id: 0}}]));
+ // Note that $project's counter will also be ticked since the $project used to generate the
+ // view and will be stitched together with the pipeline specified to the aggregate command.
+ checkCounters(() => db[viewName].aggregate([{$match: {a: 5}}]).toArray(),
+ ["$match", "$project"]);
+ }
+
+ // Standalone
+ const conn = MongoRunner.runMongod();
+ assert.neq(null, conn, "mongod was unable to start up");
+ let db = conn.getDB(jsTest.name());
+ const collName = jsTest.name();
+ let coll = db[collName];
+ runTests(db, coll);
+
+ MongoRunner.stopMongod(conn);
+
+ // Sharded cluster
+ const st = new ShardingTest({shards: 2});
+ db = st.s.getDB(jsTest.name());
+ coll = db[collName];
+ st.shardColl(coll.getFullName(), {_id: 1}, {_id: "hashed"});
+
+ runTests(db, coll);
+
+ st.stop();
+}());
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index f9e07a9b379..9daa28c57b6 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -336,9 +336,9 @@ Status runAggregate(OperationContext* opCtx,
unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
boost::intrusive_ptr<ExpressionContext> expCtx;
Pipeline* unownedPipeline;
+ const LiteParsedPipeline liteParsedPipeline(request);
auto curOp = CurOp::get(opCtx);
{
- const LiteParsedPipeline liteParsedPipeline(request);
try {
// Check whether the parsed pipeline supports the given read concern.
@@ -558,6 +558,9 @@ Status runAggregate(OperationContext* opCtx,
ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin);
+ // Report usage statistics for each stage in the pipeline.
+ liteParsedPipeline.tickGlobalStageCounters();
+
// If both explain and cursor are specified, explain wins.
if (expCtx->explain) {
Explain::explainPipelineExecutor(
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index fb911760111..bf753f4be96 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -256,6 +256,8 @@ env.Library(
'lite_parsed_document_source.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/commands/server_status_core',
+ '$BUILD_DIR/mongo/db/stats/counters',
'aggregation_request',
]
)
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 92d2ed6ffdf..bd3aa246baf 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -49,10 +49,12 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>(request.getNamespaceString(), spec);
+ return stdx::make_unique<LiteParsed>(
+ spec.fieldName(), request.getNamespaceString(), spec);
}
- explicit LiteParsed(NamespaceString nss, BSONElement spec) : _nss(std::move(nss)) {
+ explicit LiteParsed(std::string parseTimeName, NamespaceString nss, BSONElement spec)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {
// We don't do any validation here, just a minimal check for the resume token. We also
// do not need to extract the token unless the stream is running on a single namespace.
if (_nss.isCollectionlessAggregateNS() || spec.type() != BSONType::Object) {
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index b71898dbca9..2e41e825df6 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -44,10 +44,11 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>(request.getNamespaceString());
+ return std::make_unique<LiteParsed>(spec.fieldName(), request.getNamespaceString());
}
- explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {}
+ explicit LiteParsed(std::string parseTimeName, NamespaceString nss)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {}
bool isCollStats() const final {
return true;
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
index adede47959b..18030e7fbbc 100644
--- a/src/mongo/db/pipeline/document_source_current_op.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -99,7 +99,8 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li
}
}
- return stdx::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers, localOps);
+ return std::make_unique<DocumentSourceCurrentOp::LiteParsed>(
+ spec.fieldName(), allUsers, localOps);
}
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index f189325e93c..de2b0dc5918 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -49,8 +49,10 @@ public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec);
- LiteParsed(UserMode allUsers, LocalOpsMode localOps)
- : _allUsers(allUsers), _localOps(localOps) {}
+ LiteParsed(std::string parseTimeName, UserMode allUsers, LocalOpsMode localOps)
+ : LiteParsedDocumentSource(std::move(parseTimeName)),
+ _allUsers(allUsers),
+ _localOps(localOps) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 577b156ef6d..9e2f13bf07d 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -129,8 +129,8 @@ std::unique_ptr<DocumentSourceFacet::LiteParsed> DocumentSourceFacet::LiteParsed
pipeline.requiredPrivileges(unusedIsMongosFlag));
}
- return stdx::make_unique<DocumentSourceFacet::LiteParsed>(std::move(liteParsedPipelines),
- std::move(requiredPrivileges));
+ return stdx::make_unique<DocumentSourceFacet::LiteParsed>(
+ spec.fieldName(), std::move(liteParsedPipelines), std::move(requiredPrivileges));
}
stdx::unordered_set<NamespaceString> DocumentSourceFacet::LiteParsed::getInvolvedNamespaces()
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index 3f98fe1bdfa..1d1ab36b163 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -72,8 +72,11 @@ public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec);
- LiteParsed(std::vector<LiteParsedPipeline> liteParsedPipelines, PrivilegeVector privileges)
- : _liteParsedPipelines(std::move(liteParsedPipelines)),
+ LiteParsed(std::string parseTimeName,
+ std::vector<LiteParsedPipeline> liteParsedPipelines,
+ PrivilegeVector privileges)
+ : LiteParsedDocumentSource(std::move(parseTimeName)),
+ _liteParsedPipelines(std::move(liteParsedPipelines)),
_requiredPrivileges(std::move(privileges)) {}
PrivilegeVector requiredPrivileges(bool isMongos) const final {
@@ -82,6 +85,10 @@ public:
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final;
+ const std::vector<LiteParsedPipeline>& getSubPipelines() const final {
+ return _liteParsedPipelines;
+ }
+
private:
const std::vector<LiteParsedPipeline> _liteParsedPipelines;
const PrivilegeVector _requiredPrivileges;
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index f22eebb0f04..6e9c4776df4 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -76,8 +76,8 @@ std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceGraphL
PrivilegeVector privileges{
Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)};
- return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(std::move(nss),
- std::move(privileges));
+ return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(
+ spec.fieldName(), std::move(nss), std::move(privileges));
}
REGISTER_DOCUMENT_SOURCE(graphLookup,
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 51d92262b0f..5700f61d238 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -45,10 +45,11 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>(request.getNamespaceString());
+ return std::make_unique<LiteParsed>(spec.fieldName(), request.getNamespaceString());
}
- explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {}
+ explicit LiteParsed(std::string parseTimeName, NamespaceString nss)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _nss(std::move(nss)) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.h b/src/mongo/db/pipeline/document_source_list_local_cursors.h
index 62e8c237afe..103990f27a0 100644
--- a/src/mongo/db/pipeline/document_source_list_local_cursors.h
+++ b/src/mongo/db/pipeline/document_source_list_local_cursors.h
@@ -52,9 +52,12 @@ public:
class LiteParsed final : public LiteParsedDocumentSource {
public:
+ explicit LiteParsed(std::string parseTimeName)
+ : LiteParsedDocumentSource(std::move(parseTimeName)) {}
+
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>();
+ return stdx::make_unique<LiteParsed>(spec.fieldName());
}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index 787b537b515..cdabd1ae443 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -59,11 +59,12 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
-
- return stdx::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec));
+ return stdx::make_unique<LiteParsed>(spec.fieldName(),
+ listSessionsParseSpec(kStageName, spec));
}
- explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {}
+ explicit LiteParsed(std::string parseTimeName, const ListSessionsSpec& spec)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _spec(spec) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h
index aa7400bad0c..185ede0daa2 100644
--- a/src/mongo/db/pipeline/document_source_list_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_sessions.h
@@ -54,10 +54,12 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec));
+ return stdx::make_unique<LiteParsed>(spec.fieldName(),
+ listSessionsParseSpec(kStageName, spec));
}
- explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {}
+ explicit LiteParsed(std::string parseTimeName, const ListSessionsSpec& spec)
+ : LiteParsedDocumentSource(std::move(parseTimeName)), _spec(spec) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 2646d125d32..ba175032f79 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -167,20 +167,20 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars
// Recursively lite parse the nested pipeline, if one exists.
auto pipelineElem = specObj["pipeline"];
- boost::optional<LiteParsedPipeline> liteParsedPipeline;
+ std::vector<LiteParsedPipeline> liteParsedPipelineVector;
if (pipelineElem) {
auto pipeline = uassertStatusOK(AggregationRequest::parsePipelineFromBSON(pipelineElem));
AggregationRequest foreignAggReq(fromNss, std::move(pipeline));
- liteParsedPipeline = LiteParsedPipeline(foreignAggReq);
-
- auto pipelineInvolvedNamespaces = liteParsedPipeline->getInvolvedNamespaces();
+ LiteParsedPipeline liteParsedPipeline(foreignAggReq);
+ auto pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces();
foreignNssSet.insert(pipelineInvolvedNamespaces.begin(), pipelineInvolvedNamespaces.end());
+ liteParsedPipelineVector.push_back(std::move(liteParsedPipeline));
}
-
foreignNssSet.insert(fromNss);
-
- return stdx::make_unique<DocumentSourceLookUp::LiteParsed>(
- std::move(fromNss), std::move(foreignNssSet), std::move(liteParsedPipeline));
+ return stdx::make_unique<DocumentSourceLookUp::LiteParsed>(spec.fieldName(),
+ std::move(fromNss),
+ std::move(foreignNssSet),
+ std::move(liteParsedPipelineVector));
}
REGISTER_DOCUMENT_SOURCE(lookup,
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index aa8ecf50768..d778162a194 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -56,10 +56,12 @@ public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec);
- LiteParsed(NamespaceString fromNss,
+ LiteParsed(std::string parseTimeName,
+ NamespaceString fromNss,
stdx::unordered_set<NamespaceString> foreignNssSet,
- boost::optional<LiteParsedPipeline> liteParsedPipeline)
- : _fromNss{std::move(fromNss)},
+ std::vector<LiteParsedPipeline> liteParsedPipeline)
+ : LiteParsedDocumentSource(std::move(parseTimeName)),
+ _fromNss{std::move(fromNss)},
_foreignNssSet(std::move(foreignNssSet)),
_liteParsedPipeline(std::move(liteParsedPipeline)) {}
@@ -73,18 +75,25 @@ public:
&requiredPrivileges,
Privilege(ResourcePattern::forExactNamespace(_fromNss), ActionType::find));
- if (_liteParsedPipeline) {
+ if (!_liteParsedPipeline.empty()) {
+ invariant(_liteParsedPipeline.size() == 1);
Privilege::addPrivilegesToPrivilegeVector(
- &requiredPrivileges, _liteParsedPipeline->requiredPrivileges(isMongos));
+ &requiredPrivileges, _liteParsedPipeline[0].requiredPrivileges(isMongos));
}
return requiredPrivileges;
}
+ const std::vector<LiteParsedPipeline>& getSubPipelines() const override {
+ return _liteParsedPipeline;
+ }
+
private:
const NamespaceString _fromNss;
const stdx::unordered_set<NamespaceString> _foreignNssSet;
- const boost::optional<LiteParsedPipeline> _liteParsedPipeline;
+ // Even though this will only ever hold 1 element, it is stored in a vector to satisfy
+ // 'getSubPipelines'.
+ const std::vector<LiteParsedPipeline> _liteParsedPipeline;
};
GetNextResult getNext() final;
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 247dee7809f..654ee1a136b 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -70,8 +70,8 @@ std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceOut::l
PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)};
- return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(std::move(targetNss),
- std::move(privileges));
+ return stdx::make_unique<LiteParsedDocumentSourceForeignCollections>(
+ spec.fieldName(), std::move(targetNss), std::move(privileges));
}
REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::liteParse, DocumentSourceOut::createFromBson);
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.cpp b/src/mongo/db/pipeline/lite_parsed_document_source.cpp
index 882758b9644..22d764d37e3 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.cpp
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.cpp
@@ -32,18 +32,28 @@
#include "mongo/db/pipeline/lite_parsed_document_source.h"
-#include "mongo/util/string_map.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
+#include "mongo/db/stats/counters.h"
namespace mongo {
using Parser = LiteParsedDocumentSource::Parser;
namespace {
+
+// Empty vector used by LiteParsedDocumentSources which do not have a sub pipeline.
+std::vector<LiteParsedPipeline> kNoSubPipeline = {};
+
StringMap<Parser> parserMap;
} // namespace
void LiteParsedDocumentSource::registerParser(const std::string& name, Parser parser) {
parserMap[name] = parser;
+ // Initialize a counter for this document source to track how many times it is used.
+ invariant(
+ aggStageCounters.stageCounterMap
+ .insert(std::make_pair(name, std::make_unique<AggStageCounters::StageCounter>(name)))
+ .second);
}
std::unique_ptr<LiteParsedDocumentSource> LiteParsedDocumentSource::parse(
@@ -62,4 +72,9 @@ std::unique_ptr<LiteParsedDocumentSource> LiteParsedDocumentSource::parse(
return it->second(request, specElem);
}
+
+
+const std::vector<LiteParsedPipeline>& LiteParsedDocumentSource::getSubPipelines() const {
+ return kNoSubPipeline;
}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index c3173fb8212..84b7d4be557 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -44,6 +44,8 @@
namespace mongo {
+class LiteParsedPipeline;
+
/**
* A lightly parsed version of a DocumentSource. It is not executable and not guaranteed to return a
* parse error when encountering an invalid specification. Instead, the purpose of this class is to
@@ -52,6 +54,9 @@ namespace mongo {
*/
class LiteParsedDocumentSource {
public:
+ LiteParsedDocumentSource(std::string parseTimeName)
+ : _parseTimeName(std::move(parseTimeName)) {}
+
virtual ~LiteParsedDocumentSource() = default;
/*
@@ -142,6 +147,22 @@ public:
* UserException if not compatible.
*/
virtual void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {}
+
+ /**
+ * Returns this document source's subpipelines. If none exist, a reference to an empty vector
+ * is returned.
+ */
+ virtual const std::vector<LiteParsedPipeline>& getSubPipelines() const;
+
+ /**
+ * Returns the name of the stage that this LiteParsedDocumentSource represents.
+ */
+ const std::string& getParseTimeName() const {
+ return _parseTimeName;
+ }
+
+private:
+ std::string _parseTimeName;
};
class LiteParsedDocumentSourceDefault final : public LiteParsedDocumentSource {
@@ -153,10 +174,11 @@ public:
*/
static std::unique_ptr<LiteParsedDocumentSourceDefault> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return stdx::make_unique<LiteParsedDocumentSourceDefault>();
+ return std::make_unique<LiteParsedDocumentSourceDefault>(spec.fieldName());
}
- LiteParsedDocumentSourceDefault() = default;
+ LiteParsedDocumentSourceDefault(std::string parseTimeName)
+ : LiteParsedDocumentSource(std::move(parseTimeName)) {}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return stdx::unordered_set<NamespaceString>();
@@ -172,13 +194,20 @@ public:
*/
class LiteParsedDocumentSourceForeignCollections : public LiteParsedDocumentSource {
public:
- LiteParsedDocumentSourceForeignCollections(NamespaceString foreignNss,
+ LiteParsedDocumentSourceForeignCollections(std::string parseTimeName,
+ NamespaceString foreignNss,
PrivilegeVector privileges)
- : _foreignNssSet{std::move(foreignNss)}, _requiredPrivileges(std::move(privileges)) {}
+ : LiteParsedDocumentSource(std::move(parseTimeName)),
+ _foreignNssSet{std::move(foreignNss)},
+ _requiredPrivileges(std::move(privileges)) {}
- LiteParsedDocumentSourceForeignCollections(stdx::unordered_set<NamespaceString> foreignNssSet,
+ LiteParsedDocumentSourceForeignCollections(std::string parseTimeName,
+ stdx::unordered_set<NamespaceString> foreignNssSet,
PrivilegeVector privileges)
- : _foreignNssSet(std::move(foreignNssSet)), _requiredPrivileges(std::move(privileges)) {}
+ : LiteParsedDocumentSource(std::move(parseTimeName)),
+ _foreignNssSet(std::move(foreignNssSet)),
+ _requiredPrivileges(std::move(privileges)) {}
+
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
return {_foreignNssSet};
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index c717c20f275..69e56443a22 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -37,6 +37,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/stats/counters.h"
namespace mongo {
@@ -152,6 +153,23 @@ public:
}
}
+ /**
+ * Increments global stage counters corresponding to the stages in this lite parsed pipeline.
+ */
+ void tickGlobalStageCounters() const {
+ for (auto&& stage : _stageSpecs) {
+ // Tick counter corresponding to current stage.
+ auto entry = aggStageCounters.stageCounterMap.find(stage->getParseTimeName());
+ invariant(entry != aggStageCounters.stageCounterMap.end());
+ entry->second->counter.increment(1);
+
+ // Recursively step through any sub-pipelines.
+ for (auto&& subPipeline : stage->getSubPipelines()) {
+ subPipeline.tickGlobalStageCounters();
+ }
+ }
+ }
+
private:
std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs;
NamespaceString _nss;
diff --git a/src/mongo/db/stats/counters.cpp b/src/mongo/db/stats/counters.cpp
index af695566316..6e8bbfdadb2 100644
--- a/src/mongo/db/stats/counters.cpp
+++ b/src/mongo/db/stats/counters.cpp
@@ -205,4 +205,5 @@ void NetworkCounter::append(BSONObjBuilder& b) {
OpCounters globalOpCounters;
OpCounters replOpCounters;
NetworkCounter networkCounter;
-}
+AggStageCounters aggStageCounters;
+} // namespace mongo
diff --git a/src/mongo/db/stats/counters.h b/src/mongo/db/stats/counters.h
index e9742797100..e30efaca25a 100644
--- a/src/mongo/db/stats/counters.h
+++ b/src/mongo/db/stats/counters.h
@@ -31,6 +31,8 @@
#pragma once
+#include "mongo/base/counter.h"
+#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/jsobj.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/basic.h"
@@ -125,4 +127,21 @@ private:
};
extern NetworkCounter networkCounter;
-}
+
+class AggStageCounters {
+public:
+ // Container for a stage count metric along with its corresponding counter.
+ struct StageCounter {
+ explicit StageCounter(const std::string& name)
+ : metric("aggStageCounters." + name, &counter) {}
+
+ Counter64 counter;
+ ServerStatusMetricField<Counter64> metric;
+ };
+
+ // Map of aggregation stages to the number of occurrences.
+ stdx::unordered_map<std::string, std::unique_ptr<StageCounter>> stageCounterMap;
+};
+
+extern AggStageCounters aggStageCounters;
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 0fab00f2d19..7262819e5d1 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -1129,8 +1129,16 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
nsStruct.requestedNss = namespaces.requestedNss;
nsStruct.executionNss = resolvedView->getNamespace();
- return ClusterAggregate::runAggregate(
+ auto viewStatus = ClusterAggregate::runAggregate(
opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, out);
+ if (viewStatus.isOK()) {
+ // If view execution succeeded, count the stages that are part of the view definition.
+ AggregationRequest viewPipeline{resolvedView->getNamespace(),
+ resolvedView->getPipeline()};
+ LiteParsedPipeline viewLiteParsed(viewPipeline);
+ viewLiteParsed.tickGlobalStageCounters();
+ }
+ return viewStatus;
}
return status;
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index f3b4fa36e71..b09d02b522f 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -35,6 +35,7 @@
#include "mongo/base/status.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/s/commands/cluster_aggregate.h"
namespace mongo {
@@ -106,8 +107,13 @@ private:
const auto& nss = aggregationRequest.getNamespaceString();
- return ClusterAggregate::runAggregate(
+ auto status = ClusterAggregate::runAggregate(
opCtx, ClusterAggregate::Namespaces{nss, nss}, aggregationRequest, cmdObj, result);
+ if (status.isOK()) {
+ LiteParsedPipeline lpp(aggregationRequest);
+ lpp.tickGlobalStageCounters();
+ }
+ return status;
}
} clusterPipelineCmd;