summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-11-15 17:31:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-15 18:28:31 +0000
commita2f485cd414f38a3050267abbe10c907c2e6e106 (patch)
tree25d01cd7195c0e20300b70e11e4574dcc7469de4
parentd9c4ec0cec878db9d785ef4ff58d69c34247b9ed (diff)
downloadmongo-a2f485cd414f38a3050267abbe10c907c2e6e106.tar.gz
SERVER-71141 Support sampling nested aggregate queries on sharded clusters
-rw-r--r--jstests/sharding/analyze_shard_key/libs/query_sampling_util.js49
-rw-r--r--jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js305
-rw-r--r--jstests/sharding/analyze_shard_key/sample_nested_agg_queries_sharded.js143
-rw-r--r--jstests/sharding/analyze_shard_key/sample_nested_agg_queries_unsharded.js103
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp41
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp36
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.h10
-rw-r--r--src/mongo/db/pipeline/document_source_union_with_test.cpp34
-rw-r--r--src/mongo/db/pipeline/expression_context.h11
-rw-r--r--src/mongo/db/pipeline/process_interface/SConscript2
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp11
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp4
-rw-r--r--src/mongo/s/SConscript16
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp21
16 files changed, 772 insertions, 24 deletions
diff --git a/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js b/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js
index 51bc566f36c..10c9e8da807 100644
--- a/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js
+++ b/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js
@@ -27,16 +27,30 @@ var QuerySamplingUtil = (function() {
}
/**
- * Waits for the given mongos to have one active collection for query sampling.
+ * Waits for the given node to have one active collection for query sampling.
*/
- function waitForActiveSampling(mongosConn) {
+ function waitForActiveSampling(node) {
assert.soon(() => {
- const res = assert.commandWorked(mongosConn.adminCommand({serverStatus: 1}));
+ const res = assert.commandWorked(node.adminCommand({serverStatus: 1}));
return res.queryAnalyzers.activeCollections == 1;
});
}
/**
+ * Waits for all shard nodes to have one active collection for query sampling.
+ */
+ function waitForActiveSamplingOnAllShards(st) {
+ st._rs.forEach(rs => {
+ rs.nodes.forEach(node => {
+ assert.soon(() => {
+ const res = assert.commandWorked(node.adminCommand({serverStatus: 1}));
+ return res.queryAnalyzers.activeCollections == 1;
+ });
+ });
+ });
+ }
+
+ /**
* Returns true if 'subsetObj' is a sub object of 'supersetObj'. That is, every key that exists
* in 'subsetObj' also exists in 'supersetObj' and the values of that key in the two objects are
* equal.
@@ -133,18 +147,25 @@ var QuerySamplingUtil = (function() {
{actualSampledQueryDocs, expectedSampledQueryDocs});
for (let {filter, shardNames, cmdName, cmdObj, diff} of expectedSampledQueryDocs) {
+ if (!filter) {
+ // The filer is not specified so skip verifying it.
+ continue;
+ }
let shardName = null;
for (let rs of st._rs) {
const primary = rs.test.getPrimary();
- const queryDoc = primary.getCollection(kSampledQueriesNs).findOne(filter);
+ const queryDocs = primary.getCollection(kSampledQueriesNs).find(filter).toArray();
if (shardName) {
- assert.eq(queryDoc,
- null,
+ assert.eq(queryDocs.length,
+ 0,
"Found a sampled query on more than one shard " +
tojson({shardNames: [shardName, rs.test.name], cmdName, cmdObj}));
continue;
- } else if (queryDoc) {
+ } else if (queryDocs.length > 0) {
+ assert.eq(queryDocs.length, 1, queryDocs);
+ const queryDoc = queryDocs[0];
+
shardName = rs.test.name;
assert(shardNames.includes(shardName),
"Found a sampled query on an unexpected shard " +
@@ -170,6 +191,18 @@ var QuerySamplingUtil = (function() {
assert.eq(coll.find({ns}).itcount(), 0);
}
+ function clearSampledQueryCollection(primary) {
+ const coll = primary.getCollection(kSampledQueriesNs);
+ assert.commandWorked(coll.remove({}));
+ }
+
+ function clearSampledQueryCollectionOnAllShards(st) {
+ for (let rs of st._rs) {
+ const primary = rs.test.getPrimary();
+ clearSampledQueryCollection(primary);
+ }
+ }
+
/**
* Waits for the config.sampledQueriesDiff collection to have a document with _id equal to
* 'sampleId' for the collection 'ns', and then asserts that the diff in that document matches
@@ -210,9 +243,11 @@ var QuerySamplingUtil = (function() {
generateRandomCollation,
makeCmdObjIgnoreSessionInfo,
waitForActiveSampling,
+ waitForActiveSamplingOnAllShards,
assertSoonSampledQueryDocuments,
assertSoonSampledQueryDocumentsAcrossShards,
assertNoSampledQueryDocuments,
+ clearSampledQueryCollectionOnAllShards,
assertSoonSingleSampledDiffDocument,
assertNoSampledDiffDocuments,
clearSampledDiffCollection
diff --git a/jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js b/jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js
new file mode 100644
index 00000000000..38a39b3ed2e
--- /dev/null
+++ b/jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js
@@ -0,0 +1,305 @@
+/**
+ * Utilities for testing basic support for sampling nested aggregate queries (i.e. ones inside
+ * $lookup, $graphLookup, $unionWith) on a sharded cluster.
+ */
+
+load("jstests/libs/sbe_util.js"); // For checkSBEEnabled.
+
+// Make the periodic jobs for refreshing sample rates and writing sampled queries and diffs have a
+// period of 1 second to speed up the test.
+const queryAnalysisSamplerConfigurationRefreshSecs = 1;
+const queryAnalysisWriterIntervalSecs = 1;
+
+const outerAggTestCases = [
+ // The test cases for singly-nested aggregate queries.
+ {
+ name: "lookup_custom_pipeline",
+ supportCustomPipeline: true,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [{$lookup: {from: foreignCollName, as: "joined", pipeline}}];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => true
+ },
+ {
+ name: "lookup_non_custom_pipeline",
+ supportCustomPipeline: false,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [
+ {$lookup: {from: foreignCollName, as: "joined", localField: "a", foreignField: "x"}}
+ ];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => {
+ // When SBE is enabled, if the collection is not sharded and not clustered, the shard
+ // will not create a separate pipeline to execute the inner side of a $lookup stage so
+ // there is no nested aggregate query to route.
+ const listCollectionRes =
+ assert.commandWorked(db.runCommand({listCollections: 1, filter: {name: collName}}));
+ const isClusteredColl =
+ listCollectionRes.cursor.firstBatch[0].options.hasOwnProperty("clusteredIndex");
+ const isEligibleForSBELookupPushdown =
+ checkSBEEnabled(db) && !isShardedColl && !isClusteredColl;
+ return !isEligibleForSBELookupPushdown;
+ }
+ },
+ {
+ name: "unionWith",
+ supportCustomPipeline: true,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [{$unionWith: {coll: foreignCollName, pipeline}}];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => true
+ },
+ {
+ name: "graphLookup",
+ supportCustomPipeline: false,
+ makeOuterPipelineFunc: (localCollName, foreignCollName) => {
+ return [{
+ $graphLookup: {
+ from: foreignCollName,
+ startWith: "$x",
+ connectFromField: "x",
+ connectToField: "a",
+ maxDepth: 1,
+ as: "connected"
+ }
+ }];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => true
+ },
+ // The test cases for doubly-nested aggregate queries.
+ {
+ name: "lookup+lookup",
+ supportCustomPipeline: true,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [{
+ $lookup: {
+ from: localCollName,
+ as: "joined",
+ pipeline: [{
+ $lookup: {
+ from: foreignCollName,
+ as: "joined",
+ pipeline
+ }
+ }]
+ }
+ }];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => true
+ },
+ {
+ name: "lookup+unionWith",
+ supportCustomPipeline: true,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [{
+ $lookup: {
+ from: localCollName,
+ as: "joined",
+ pipeline: [{
+ $unionWith: {
+ coll: foreignCollName,
+ pipeline
+ }
+ }]
+ }
+ }];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => true
+ },
+ {
+ name: "lookup+graphLookUp",
+ supportCustomPipeline: false,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [{
+ $lookup: {
+ from: localCollName,
+ as: "joined",
+ pipeline: [{
+ $graphLookup: {
+ from: foreignCollName,
+ startWith: "$x",
+ connectFromField: "x",
+ connectToField: "a",
+ maxDepth: 1,
+ as: "connected"
+ }
+ }]
+ }
+ }];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => true
+ },
+ {
+ name: "unionWith+lookup",
+ supportCustomPipeline: true,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [{
+ $unionWith: {
+ coll: localCollName,
+ pipeline: [{$lookup: {from: foreignCollName, as: "joined", pipeline}}]
+ }
+ }];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => true
+ },
+ {
+ name: "unionWith+unionWith",
+ supportCustomPipeline: true,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [{
+ $unionWith: {
+ coll: localCollName,
+ pipeline: [{$unionWith: {coll: foreignCollName, pipeline}}]
+ }
+ }];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => true
+ },
+ {
+ name: "unionWith+graphLookup",
+ supportCustomPipeline: false,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [{
+ $unionWith: {
+ coll: localCollName,
+ pipeline: [{
+ $graphLookup: {
+ from: foreignCollName,
+ startWith: "$x",
+ connectFromField: "x",
+ connectToField: "a",
+ maxDepth: 1,
+ as: "connected"
+ }
+ }]
+ }
+ }];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => true,
+ },
+ {
+ name: "facet",
+ supportCustomPipeline: false,
+ makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => {
+ return [{$facet: {foo: [{$match: {}}]}}];
+ },
+ requireShardToRouteFunc: (db, collName, isShardedColl) => false,
+ }
+];
+
+const innerAggTestCases = [
+ {
+ // The filter is in the first stage.
+ containInitialFilter: true,
+ makeInnerPipelineFunc: (filter) => {
+ return [{$match: filter}];
+ }
+ },
+ {
+ // The filter is not in the first stage but the stage that it is in is moveable.
+ containInitialFilter: true,
+ makeInnerPipelineFunc: (filter) => {
+ return [{$sort: {x: -1}}, {$match: filter}];
+ }
+ },
+ {
+ // The filter is not in the first stage and the stage that it is in is not moveable.
+ containInitialFilter: false,
+ makeInnerPipelineFunc: (filter) => {
+ return [{$_internalInhibitOptimization: {}}, {$match: filter}];
+ }
+ }
+];
+
+/**
+ * Tests that a nested aggregate query run internally by an aggregation stage that takes in a
+ * "pipeline" is sampled correctly.
+ */
+function testCustomInnerPipeline(makeOuterPipelineFunc,
+ makeInnerPipelineFunc,
+ containInitialFilter,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ filter,
+ shardNames,
+ explain,
+ requireShardToRoute) {
+ const mongosDB = st.s.getDB(dbName);
+ const foreignNs = dbName + "." + foreignCollName;
+ const foreignCollUuid = QuerySamplingUtil.getCollectionUuid(mongosDB, foreignCollName);
+
+ let expectedSampledQueryDocs = [];
+
+ const collation = QuerySamplingUtil.generateRandomCollation();
+ const innerPipeline = makeInnerPipelineFunc(filter);
+ const outerPipeline = makeOuterPipelineFunc(localCollName, foreignCollName, innerPipeline);
+ const originalCmdObj =
+ {aggregate: localCollName, pipeline: outerPipeline, collation, cursor: {}};
+
+ jsTest.log("Testing command " + tojsononeline({originalCmdObj, explain}));
+ assert.commandWorked(mongosDB.runCommand(explain ? {explain: originalCmdObj} : originalCmdObj));
+
+ // Queries that are part of an 'explain' or that do not require shards to route should not get
+ // sampled.
+ if (!explain && requireShardToRoute) {
+ const expectedFilter = containInitialFilter ? filter : {};
+ const expectedDoc = {
+ cmdName: "aggregate",
+ cmdObj: {filter: expectedFilter, collation},
+ shardNames
+ };
+ if (filter) {
+ expectedDoc.filter = {"cmd.filter": expectedFilter};
+ }
+ expectedSampledQueryDocs.push(expectedDoc);
+ }
+
+ sleep(queryAnalysisWriterIntervalSecs * 1000);
+ QuerySamplingUtil.assertSoonSampledQueryDocumentsAcrossShards(
+ st, foreignNs, foreignCollUuid, ["aggregate"], expectedSampledQueryDocs);
+ QuerySamplingUtil.clearSampledQueryCollectionOnAllShards(st);
+}
+
+/**
+ * Tests that a nested aggregate query run internally by an aggregation stage that does not take in
+ * a "pipeline" is sampled correctly.
+ */
+function testNoCustomInnerPipeline(makeOuterPipelineFunc,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ explain,
+ requireShardToRoute) {
+ const mongosDB = st.s.getDB(dbName);
+ const foreignNs = dbName + "." + foreignCollName;
+ const foreignCollUuid = QuerySamplingUtil.getCollectionUuid(mongosDB, foreignCollName);
+
+ let expectedSampledQueryDocs = [];
+
+ const collation = QuerySamplingUtil.generateRandomCollation();
+ const outerPipeline = makeOuterPipelineFunc(localCollName, foreignCollName);
+ const originalCmdObj =
+ {aggregate: localCollName, pipeline: outerPipeline, collation, cursor: {}};
+
+ jsTest.log("Testing command " + tojsononeline({originalCmdObj, explain}));
+ assert.commandWorked(mongosDB.runCommand(explain ? {explain: originalCmdObj} : originalCmdObj));
+
+ // Queries that are part of an 'explain' or that do not require shards to route should not get
+ // sampled.
+ if (!explain && requireShardToRoute) {
+ // Out of the aggregation stages above, the only stages that doesn't take in a custom
+ // pipeline are $graphLookup and $lookup (without a "pipeline" field). To avoid relying on
+ // the current format of the $match filter that they internally construct, skip verifying
+ // the filter and only verify that the query is present in the config.sampledQueries
+ // collection.
+ expectedSampledQueryDocs.push({});
+ }
+
+ sleep(queryAnalysisWriterIntervalSecs * 1000);
+ QuerySamplingUtil.assertSoonSampledQueryDocumentsAcrossShards(
+ st, foreignNs, foreignCollUuid, ["aggregate"], expectedSampledQueryDocs);
+ QuerySamplingUtil.clearSampledQueryCollectionOnAllShards(st);
+}
diff --git a/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_sharded.js b/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_sharded.js
new file mode 100644
index 00000000000..58efb7c3565
--- /dev/null
+++ b/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_sharded.js
@@ -0,0 +1,143 @@
+/**
+ * Tests basic support for sampling nested aggregate queries (i.e. ones inside $lookup,
+ * $graphLookup, $unionWith) against a sharded collection on a sharded cluster.
+ *
+ * @tags: [requires_fcv_62, featureFlagAnalyzeShardKey]
+ */
+(function() {
+"use strict";
+
+load("jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js");
+load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js");
+
+const st = new ShardingTest({
+ shards: 3,
+ rs: {
+ nodes: 2,
+ setParameter:
+ {queryAnalysisSamplerConfigurationRefreshSecs, queryAnalysisWriterIntervalSecs}
+ },
+ // Disable query sampling on mongos to verify that the nested aggregate queries are sampled by
+ // the shard that routes them.
+ mongosOptions:
+ {setParameter: {"failpoint.disableQueryAnalysisSampler": tojson({mode: "alwaysOn"})}}
+});
+
+const dbName = "testDb";
+const localCollName = "testLocalColl";
+const foreignCollName = "testForeignColl";
+const foreignNs = dbName + "." + foreignCollName;
+const mongosDB = st.s.getDB(dbName);
+
+// Set up the local collection. It needs to have at least one document. Otherwise, no nested
+// aggregate queries would be issued.
+assert.commandWorked(mongosDB.getCollection(localCollName).insert([{a: 0}]));
+
+// Set up the foreign collection. Make it have three chunks:
+// shard0: [MinKey, 0]
+// shard1: [0, 1000]
+// shard1: [1000, MaxKey]
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+st.ensurePrimaryShard(dbName, st.shard0.name);
+assert.commandWorked(st.s.adminCommand({shardCollection: foreignNs, key: {x: 1}}));
+assert.commandWorked(st.s.adminCommand({split: foreignNs, middle: {x: 0}}));
+assert.commandWorked(st.s.adminCommand({split: foreignNs, middle: {x: 1000}}));
+assert.commandWorked(
+ st.s.adminCommand({moveChunk: foreignNs, find: {x: 0}, to: st.shard1.shardName}));
+assert.commandWorked(
+ st.s.adminCommand({moveChunk: foreignNs, find: {x: 1000}, to: st.shard2.name}));
+
+assert.commandWorked(
+ st.s.adminCommand({configureQueryAnalyzer: foreignNs, mode: "full", sampleRate: 1000}));
+QuerySamplingUtil.waitForActiveSamplingOnAllShards(st);
+
+for (let {name,
+ makeOuterPipelineFunc,
+ requireShardToRouteFunc,
+ supportCustomPipeline} of outerAggTestCases) {
+ const requireShardToRoute =
+ requireShardToRouteFunc(mongosDB, foreignCollName, true /* isShardedColl */);
+ if (supportCustomPipeline) {
+ for (let {makeInnerPipelineFunc, containInitialFilter} of innerAggTestCases) {
+ const filter0 = {x: 1, name};
+ // If the aggregation doesn't have an initial filter, it would be routed to all shards.
+ const shardNames0 =
+ containInitialFilter ? [st.rs1.name] : [st.rs0.name, st.rs1.name, st.rs2.name];
+ testCustomInnerPipeline(makeOuterPipelineFunc,
+ makeInnerPipelineFunc,
+ containInitialFilter,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ filter0,
+ shardNames0,
+ false /* explain */,
+ requireShardToRoute);
+
+ const filter1 = {x: {$gte: 2}, name};
+ // If the aggregation doesn't have an initial filter, it would be routed to all shards.
+ const shardNames1 = containInitialFilter ? [st.rs1.name, st.rs2.name]
+ : [st.rs0.name, st.rs1.name, st.rs2.name];
+ testCustomInnerPipeline(makeOuterPipelineFunc,
+ makeInnerPipelineFunc,
+ containInitialFilter,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ filter1,
+ shardNames1,
+ false /* explain */,
+ requireShardToRoute);
+
+ const filter2 = {x: 3, name};
+ const shardNames2 = [];
+ testCustomInnerPipeline(makeOuterPipelineFunc,
+ makeInnerPipelineFunc,
+ containInitialFilter,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ filter2,
+ shardNames2,
+ true /* explain */,
+ requireShardToRoute);
+
+ const filter3 = {x: {$gte: 4}, name};
+ const shardNames3 = [];
+ testCustomInnerPipeline(makeOuterPipelineFunc,
+ makeInnerPipelineFunc,
+ containInitialFilter,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ filter3,
+ shardNames3,
+ true /* explain */,
+ requireShardToRoute);
+ }
+ } else {
+ testNoCustomInnerPipeline(makeOuterPipelineFunc,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ false /* explain */,
+ requireShardToRoute);
+ testNoCustomInnerPipeline(makeOuterPipelineFunc,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ true /* explain */,
+ requireShardToRoute);
+ }
+}
+
+assert.commandWorked(st.s.adminCommand({configureQueryAnalyzer: foreignNs, mode: "off"}));
+
+st.stop();
+})();
diff --git a/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_unsharded.js b/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_unsharded.js
new file mode 100644
index 00000000000..5b4626a12f1
--- /dev/null
+++ b/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_unsharded.js
@@ -0,0 +1,103 @@
+/**
+ * Tests basic support for sampling nested aggregate queries (i.e. inside $lookup, $graphLookup,
+ * $unionWith) against an unsharded collection on a sharded cluster.
+ *
+ * @tags: [requires_fcv_62, featureFlagAnalyzeShardKey]
+ */
+(function() {
+"use strict";
+
+load("jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js");
+load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js");
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 2,
+ setParameter:
+ {queryAnalysisSamplerConfigurationRefreshSecs, queryAnalysisWriterIntervalSecs}
+ },
+ // Disable query sampling on mongos to verify that the nested aggregate queries are sampled by
+ // the shard that routes them.
+ mongosOptions:
+ {setParameter: {"failpoint.disableQueryAnalysisSampler": tojson({mode: "alwaysOn"})}}
+});
+
+const dbName = "testDb";
+const localCollName = "testLocalColl";
+const foreignCollName = "testForeignColl";
+const foreignNs = dbName + "." + foreignCollName;
+const mongosDB = st.s.getDB(dbName);
+
+// Set up the local collection. It needs to have at least one document. Otherwise, no nested
+// aggregate queries will be issued.
+assert.commandWorked(mongosDB.getCollection(localCollName).insert([{a: 0}]));
+
+// Set up the foreign collection.
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+st.ensurePrimaryShard(dbName, st.shard0.name);
+assert.commandWorked(mongosDB.createCollection(foreignCollName));
+
+assert.commandWorked(
+ st.s.adminCommand({configureQueryAnalyzer: foreignNs, mode: "full", sampleRate: 1000}));
+QuerySamplingUtil.waitForActiveSamplingOnAllShards(st);
+
+// The foreign collection is unsharded so all documents are on the primary shard.
+const shardNames = [st.rs0.name];
+
+for (let {name,
+ makeOuterPipelineFunc,
+ requireShardToRouteFunc,
+ supportCustomPipeline} of outerAggTestCases) {
+ const requireShardToRoute =
+ requireShardToRouteFunc(mongosDB, foreignCollName, false /* isShardedColl */);
+ if (supportCustomPipeline) {
+ for (let {makeInnerPipelineFunc, containInitialFilter} of innerAggTestCases) {
+ const filter0 = {x: 1, name};
+ testCustomInnerPipeline(makeOuterPipelineFunc,
+ makeInnerPipelineFunc,
+ containInitialFilter,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ filter0,
+ shardNames,
+ false /* explain */,
+ requireShardToRoute);
+
+ const filter1 = {x: 2, name};
+ testCustomInnerPipeline(makeOuterPipelineFunc,
+ makeInnerPipelineFunc,
+ containInitialFilter,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ filter1,
+ shardNames,
+ true /* explain */,
+ requireShardToRoute);
+ }
+ } else {
+ testNoCustomInnerPipeline(makeOuterPipelineFunc,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ false /* explain */,
+ requireShardToRoute);
+ testNoCustomInnerPipeline(makeOuterPipelineFunc,
+ st,
+ dbName,
+ localCollName,
+ foreignCollName,
+ true /* explain */,
+ requireShardToRoute);
+ }
+}
+
+assert.commandWorked(st.s.adminCommand({configureQueryAnalyzer: foreignNs, mode: "off"}));
+
+st.stop();
+})();
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index 78761b412bd..dd29e4816a9 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/sort_reorder_helpers.h"
#include "mongo/db/query/query_knobs_gen.h"
+#include "mongo/db/stats/counters.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/logv2/log.h"
@@ -654,6 +655,10 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
_unwind(unwindSrc),
_variables(expCtx->variables),
_variablesParseState(expCtx->variablesParseState.copyWith(_variables.useIdGenerator())) {
+ if (!_from.isOnInternalDb()) {
+ globalOpCounters.gotNestedAggregate();
+ }
+
const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_from);
_fromExpCtx = pExpCtx->copyForSubPipeline(resolvedNamespace.ns, resolvedNamespace.uuid);
_fromExpCtx->inLookup = true;
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index 852ce00d519..54189a47166 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/pipeline/document_source_graph_lookup.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h"
+#include "mongo/db/stats/counters.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
@@ -714,6 +715,46 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldNotExpandArraysWithinArraysAtEndOfCo
ASSERT(graphLookupStage->getNext().isEOF());
}
+TEST_F(DocumentSourceGraphLookUpTest, IncrementNestedAggregateOpCounterOnCreateButNotOnCopy) {
+ auto testOpCounter = [&](const NamespaceString& nss, const int expectedIncrease) {
+ auto resolvedNss = StringMap<ExpressionContext::ResolvedNamespace>{
+ {nss.coll().toString(), {nss, std::vector<BSONObj>()}}};
+ auto countBeforeCreate = globalOpCounters.getNestedAggregate()->load();
+
+ // Create a DocumentSourceGraphLookUp and verify that the counter increases by the expected
+ // amount.
+ auto originalExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss);
+ originalExpCtx->setResolvedNamespaces(resolvedNss);
+ auto docSource = DocumentSourceGraphLookUp::createFromBson(
+ BSON("$graphLookup" << BSON("from" << nss.coll() << "startWith"
+ << "$x"
+ << "connectFromField"
+ << "id"
+ << "connectToField"
+ << "id"
+ << "as"
+ << "connections"))
+ .firstElement(),
+ originalExpCtx);
+ auto originalGraphLookup = static_cast<DocumentSourceGraphLookUp*>(docSource.get());
+ auto countAfterCreate = globalOpCounters.getNestedAggregate()->load();
+ ASSERT_EQ(countAfterCreate - countBeforeCreate, expectedIncrease);
+
+ // Copy the DocumentSourceGraphLookUp and verify that the counter doesn't increase.
+ auto newExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss);
+ newExpCtx->setResolvedNamespaces(resolvedNss);
+ DocumentSourceGraphLookUp newGraphLookup{*originalGraphLookup, newExpCtx};
+ auto countAfterCopy = globalOpCounters.getNestedAggregate()->load();
+ ASSERT_EQ(countAfterCopy - countAfterCreate, 0);
+ };
+
+ testOpCounter(NamespaceString{"testDb", "testColl"}, 1);
+ // $graphLookup against internal databases should not cause the counter to get incremented.
+ testOpCounter(NamespaceString{"config", "testColl"}, 0);
+ testOpCounter(NamespaceString{"admin", "testColl"}, 0);
+ testOpCounter(NamespaceString{"local", "testColl"}, 0);
+}
+
using DocumentSourceUnionWithServerlessTest = ServerlessAggregationContextFixture;
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 20253fec682..6ee70cefcb0 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/pipeline/variable_validation.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/query_knobs_gen.h"
+#include "mongo/db/stats/counters.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/overflow_arithmetic.h"
@@ -133,6 +134,10 @@ DocumentSourceLookUp::DocumentSourceLookUp(
_as(std::move(as)),
_variables(expCtx->variables),
_variablesParseState(expCtx->variablesParseState.copyWith(_variables.useIdGenerator())) {
+ if (!_fromNs.isOnInternalDb()) {
+ globalOpCounters.gotNestedAggregate();
+ }
+
const auto& resolvedNamespace = expCtx->getResolvedNamespace(_fromNs);
_resolvedNs = resolvedNamespace.ns;
_resolvedPipeline = resolvedNamespace.pipeline;
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 0fcad6cbb37..05b783f9ebb 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/stats/counters.h"
namespace mongo {
namespace {
@@ -1424,6 +1425,41 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl
ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
}
+TEST_F(DocumentSourceLookUpTest, IncrementNestedAggregateOpCounterOnCreateButNotOnCopy) {
+ auto testOpCounter = [&](const NamespaceString& nss, const int expectedIncrease) {
+ auto resolvedNss = StringMap<ExpressionContext::ResolvedNamespace>{
+ {nss.coll().toString(), {nss, std::vector<BSONObj>()}}};
+ auto countBeforeCreate = globalOpCounters.getNestedAggregate()->load();
+
+ // Create a DocumentSourceLookUp and verify that the counter increases by the expected
+ // amount.
+ auto originalExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss);
+ originalExpCtx->setResolvedNamespaces(resolvedNss);
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ BSON("$lookup" << BSON("from" << nss.coll() << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) << "as"
+ << "as"))
+ .firstElement(),
+ originalExpCtx);
+ auto originalLookup = static_cast<DocumentSourceLookUp*>(docSource.get());
+ auto countAfterCreate = globalOpCounters.getNestedAggregate()->load();
+ ASSERT_EQ(countAfterCreate - countBeforeCreate, expectedIncrease);
+
+ // Copy the DocumentSourceLookUp and verify that the counter doesn't increase.
+ auto newExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss);
+ newExpCtx->setResolvedNamespaces(resolvedNss);
+ DocumentSourceLookUp newLookup{*originalLookup, newExpCtx};
+ auto countAfterCopy = globalOpCounters.getNestedAggregate()->load();
+ ASSERT_EQ(countAfterCopy - countAfterCreate, 0);
+ };
+
+ testOpCounter(NamespaceString{"testDb", "testColl"}, 1);
+ // $lookup against internal databases should not cause the counter to get incremented.
+ testOpCounter(NamespaceString{"config", "testColl"}, 0);
+ testOpCounter(NamespaceString{"admin", "testColl"}, 0);
+ testOpCounter(NamespaceString{"local", "testColl"}, 0);
+}
+
using DocumentSourceLookUpServerlessTest = ServerlessAggregationContextFixture;
TEST_F(DocumentSourceLookUpServerlessTest,
diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h
index c9ac829c7a0..ab57f8f7cf2 100644
--- a/src/mongo/db/pipeline/document_source_union_with.h
+++ b/src/mongo/db/pipeline/document_source_union_with.h
@@ -35,6 +35,7 @@
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/stage_constraints.h"
+#include "mongo/db/stats/counters.h"
namespace mongo {
@@ -63,6 +64,11 @@ public:
DocumentSourceUnionWith(const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
: DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) {
+ if (!_pipeline->getContext()->ns.isOnInternalDb()) {
+ globalOpCounters.gotNestedAggregate();
+ }
+ _pipeline->getContext()->inUnionWith = true;
+
// If this pipeline is being run as part of explain, then cache a copy to use later during
// serialization.
if (expCtx->explain >= ExplainOptions::Verbosity::kExecStats) {
@@ -74,7 +80,9 @@ public:
const boost::intrusive_ptr<ExpressionContext>& newExpCtx)
: DocumentSource(kStageName,
newExpCtx ? newExpCtx : original.pExpCtx->copyWith(original.pExpCtx->ns)),
- _pipeline(original._pipeline->clone()) {}
+ _pipeline(original._pipeline->clone()) {
+ _pipeline->getContext()->inUnionWith = true;
+ }
~DocumentSourceUnionWith();
diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp
index 05e0feb7baa..50f9047bd0d 100644
--- a/src/mongo/db/pipeline/document_source_union_with_test.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp
@@ -588,6 +588,40 @@ TEST_F(DocumentSourceUnionWithTest, StricterConstraintsFromSubSubPipelineAreInhe
ASSERT_TRUE(unionStage.constraints(Pipeline::SplitState::kUnsplit) == expectedConstraints);
}
+TEST_F(DocumentSourceUnionWithTest, IncrementNestedAggregateOpCounterOnCreateButNotOnCopy) {
+ auto testOpCounter = [&](const NamespaceString& nss, const int expectedIncrease) {
+ auto resolvedNss = StringMap<ExpressionContext::ResolvedNamespace>{
+ {nss.coll().toString(), {nss, std::vector<BSONObj>()}}};
+ auto countBeforeCreate = globalOpCounters.getNestedAggregate()->load();
+
+ // Create a DocumentSourceUnionWith and verify that the counter increases by the expected
+ // amount.
+ auto originalExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss);
+ originalExpCtx->setResolvedNamespaces(resolvedNss);
+ auto docSource = DocumentSourceUnionWith::createFromBson(
+ BSON("$unionWith" << BSON("coll" << nss.coll() << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSON("x" << 1)))))
+ .firstElement(),
+ originalExpCtx);
+ auto originalUnionWith = static_cast<DocumentSourceUnionWith*>(docSource.get());
+ auto countAfterCreate = globalOpCounters.getNestedAggregate()->load();
+ ASSERT_EQ(countAfterCreate - countBeforeCreate, expectedIncrease);
+
+ // Copy the DocumentSourceUnionWith and verify that the counter doesn't increase.
+ auto newExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss);
+ newExpCtx->setResolvedNamespaces(resolvedNss);
+ DocumentSourceUnionWith newUnionWith{*originalUnionWith, newExpCtx};
+ auto countAfterCopy = globalOpCounters.getNestedAggregate()->load();
+ ASSERT_EQ(countAfterCopy - countAfterCreate, 0);
+ };
+
+ testOpCounter(NamespaceString{"testDb", "testColl"}, 1);
+ // $unionWith against internal databases should not cause the counter to get incremented.
+ testOpCounter(NamespaceString{"config", "testColl"}, 0);
+ testOpCounter(NamespaceString{"admin", "testColl"}, 0);
+ testOpCounter(NamespaceString{"local", "testColl"}, 0);
+}
+
using DocumentSourceUnionWithServerlessTest = ServerlessAggregationContextFixture;
TEST_F(DocumentSourceUnionWithServerlessTest,
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 8629311709a..3ec3554a336 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -300,6 +300,14 @@ public:
return tailableMode == TailableModeEnum::kTailableAndAwaitData;
}
+ /**
+ * Returns true if the pipeline is eligible for query sampling. That is, it is not an explain
+ * and either it is not nested or it is nested inside $lookup, $graphLookup and $unionWith.
+ */
+ bool eligibleForSampling() const {
+ return !explain && (subPipelineDepth == 0 || inLookup || inUnionWith);
+ }
+
void setResolvedNamespaces(StringMap<ResolvedNamespace> resolvedNamespaces) {
_resolvedNamespaces = std::move(resolvedNamespaces);
}
@@ -440,6 +448,9 @@ public:
// True if this 'ExpressionContext' object is for the inner side of a $lookup or $graphLookup.
bool inLookup = false;
+ // True if this 'ExpressionContext' object is for the inner side of a $unionWith.
+ bool inUnionWith = false;
+
// If set, this will disallow use of features introduced in versions above the provided version.
boost::optional<multiversion::FeatureCompatibilityVersion> maxFeatureCompatibilityVersion;
diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript
index c590971845a..769d2476394 100644
--- a/src/mongo/db/pipeline/process_interface/SConscript
+++ b/src/mongo/db/pipeline/process_interface/SConscript
@@ -56,10 +56,12 @@ env.Library(
'$BUILD_DIR/mongo/db/operation_time_tracker',
'$BUILD_DIR/mongo/db/ops/write_ops',
'$BUILD_DIR/mongo/db/repl/primary_only_service',
+ '$BUILD_DIR/mongo/db/s/query_analysis_writer',
'$BUILD_DIR/mongo/db/server_feature_flags',
'$BUILD_DIR/mongo/db/session/session_catalog',
'$BUILD_DIR/mongo/db/stats/fill_locker_info',
'$BUILD_DIR/mongo/db/storage/backup_cursor_hooks',
+ '$BUILD_DIR/mongo/s/query_analysis_sampler',
'$BUILD_DIR/mongo/scripting/scripting_common',
],
)
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index 430378b2a41..62dbd96b5ab 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -59,6 +59,7 @@
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/query/sbe_plan_cache.h"
#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/transaction_coordinator_curop.h"
#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h"
@@ -74,6 +75,7 @@
#include "mongo/logv2/log.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/query/document_source_merge_cursors.h"
+#include "mongo/s/query_analysis_sampler_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
@@ -423,6 +425,15 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline*
return pipeline;
}
+ if (expCtx->eligibleForSampling()) {
+ if (auto sampleId = analyze_shard_key::tryGenerateSampleId(expCtx->opCtx, expCtx->ns)) {
+ analyze_shard_key::QueryAnalysisWriter::get(expCtx->opCtx)
+ .addAggregateQuery(
+ *sampleId, expCtx->ns, pipeline->getInitialQuery(), expCtx->getCollatorBSON())
+ .getAsync([](auto) {});
+ }
+ }
+
boost::optional<AutoGetCollectionForReadCommandMaybeLockFree> autoColl;
const NamespaceStringOrUUID nsOrUUID =
expCtx->uuid ? NamespaceStringOrUUID{expCtx->ns.dbName(), *expCtx->uuid} : expCtx->ns;
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index e9d6a35c5ad..6345ed858ee 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -786,7 +786,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest),
hasChangeStream,
startsWithDocuments,
- !aggRequest.getExplain() /* eligibleForSampling */,
+ expCtx->eligibleForSampling(),
std::move(pipeline),
shardTargetingPolicy,
std::move(readConcern));
@@ -1495,7 +1495,7 @@ BSONObj targetShardsForExplain(Pipeline* ownedPipeline) {
dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest),
hasChangeStream,
startsWithDocuments,
- false /* eligibleForSampling */,
+ expCtx->eligibleForSampling(),
std::move(pipeline));
BSONObjBuilder explainBuilder;
auto appendStatus =
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index db9c79c432a..c5fdeb633be 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -44,14 +44,23 @@ env.Library(
)
env.Library(
+ target='query_analysis_sampler',
+ source=[
+ 'query_analysis_sampler.cpp',
+ 'query_analysis_sampler_util.cpp',
+ 'query_analysis_server_status.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ 'grid',
+ ],
+)
+
+env.Library(
target='sharding_router_api',
source=[
'cluster_commands_helpers.cpp',
'collection_uuid_mismatch.cpp',
'multi_statement_transaction_requests_sender.cpp',
- 'query_analysis_sampler.cpp',
- 'query_analysis_sampler_util.cpp',
- 'query_analysis_server_status.cpp',
'router_transactions_metrics.cpp',
'router_transactions_stats.idl',
'router.cpp',
@@ -70,6 +79,7 @@ env.Library(
'$BUILD_DIR/mongo/db/shared_request_handling',
'async_requests_sender',
'grid',
+ 'query_analysis_sampler',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/collection_uuid_mismatch_info',
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index ed04f6592ff..0e26dd1b5a5 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -169,7 +169,6 @@ bool runAggregationMapReduce(OperationContext* opCtx,
auto cm = uassertStatusOK(
sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, parsedMr.getNamespace()));
auto expCtx = makeExpressionContext(opCtx, parsedMr, cm, verbosity);
- const bool eligibleForSampling = !expCtx->explain;
const auto pipelineBuilder = [&]() {
return map_reduce_common::translateFromMR(parsedMr, expCtx);
@@ -202,15 +201,15 @@ bool runAggregationMapReduce(OperationContext* opCtx,
// needed in the normal aggregation path. For this translation, though, we need to
// build the pipeline to serialize and send to the primary shard.
auto serialized = serializeToCommand(cmd, parsedMr, pipelineBuilder().get());
- uassertStatusOK(
- cluster_aggregation_planner::runPipelineOnPrimaryShard(expCtx,
- namespaces,
- *targeter.cm,
- verbosity,
- std::move(serialized),
- privileges,
- eligibleForSampling,
- &tempResults));
+ uassertStatusOK(cluster_aggregation_planner::runPipelineOnPrimaryShard(
+ expCtx,
+ namespaces,
+ *targeter.cm,
+ verbosity,
+ std::move(serialized),
+ privileges,
+ expCtx->eligibleForSampling(),
+ &tempResults));
break;
}
@@ -239,7 +238,7 @@ bool runAggregationMapReduce(OperationContext* opCtx,
&tempResults,
false /* hasChangeStream */,
false /* startsWithDocuments */,
- eligibleForSampling));
+ expCtx->eligibleForSampling()));
break;
}