From a2f485cd414f38a3050267abbe10c907c2e6e106 Mon Sep 17 00:00:00 2001 From: Cheahuychou Mao Date: Tue, 15 Nov 2022 17:31:43 +0000 Subject: SERVER-71141 Support sampling nested aggregate queries on sharded clusters --- .../analyze_shard_key/libs/query_sampling_util.js | 49 +++- .../libs/sample_nested_agg_queries_common.js | 305 +++++++++++++++++++++ .../sample_nested_agg_queries_sharded.js | 143 ++++++++++ .../sample_nested_agg_queries_unsharded.js | 103 +++++++ .../db/pipeline/document_source_graph_lookup.cpp | 5 + .../pipeline/document_source_graph_lookup_test.cpp | 41 +++ src/mongo/db/pipeline/document_source_lookup.cpp | 5 + .../db/pipeline/document_source_lookup_test.cpp | 36 +++ src/mongo/db/pipeline/document_source_union_with.h | 10 +- .../pipeline/document_source_union_with_test.cpp | 34 +++ src/mongo/db/pipeline/expression_context.h | 11 + src/mongo/db/pipeline/process_interface/SConscript | 2 + .../common_mongod_process_interface.cpp | 11 + src/mongo/db/pipeline/sharded_agg_helpers.cpp | 4 +- src/mongo/s/SConscript | 16 +- src/mongo/s/commands/cluster_map_reduce_agg.cpp | 21 +- 16 files changed, 772 insertions(+), 24 deletions(-) create mode 100644 jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js create mode 100644 jstests/sharding/analyze_shard_key/sample_nested_agg_queries_sharded.js create mode 100644 jstests/sharding/analyze_shard_key/sample_nested_agg_queries_unsharded.js diff --git a/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js b/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js index 51bc566f36c..10c9e8da807 100644 --- a/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js +++ b/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js @@ -27,15 +27,29 @@ var QuerySamplingUtil = (function() { } /** - * Waits for the given mongos to have one active collection for query sampling. + * Waits for the given node to have one active collection for query sampling. */ - function waitForActiveSampling(mongosConn) { + function waitForActiveSampling(node) { assert.soon(() => { - const res = assert.commandWorked(mongosConn.adminCommand({serverStatus: 1})); + const res = assert.commandWorked(node.adminCommand({serverStatus: 1})); return res.queryAnalyzers.activeCollections == 1; }); } + /** + * Waits for all shard nodes to have one active collection for query sampling. + */ + function waitForActiveSamplingOnAllShards(st) { + st._rs.forEach(rs => { + rs.nodes.forEach(node => { + assert.soon(() => { + const res = assert.commandWorked(node.adminCommand({serverStatus: 1})); + return res.queryAnalyzers.activeCollections == 1; + }); + }); + }); + } + /** * Returns true if 'subsetObj' is a sub object of 'supersetObj'. That is, every key that exists * in 'subsetObj' also exists in 'supersetObj' and the values of that key in the two objects are @@ -133,18 +147,25 @@ var QuerySamplingUtil = (function() { {actualSampledQueryDocs, expectedSampledQueryDocs}); for (let {filter, shardNames, cmdName, cmdObj, diff} of expectedSampledQueryDocs) { + if (!filter) { + // The filer is not specified so skip verifying it. + continue; + } let shardName = null; for (let rs of st._rs) { const primary = rs.test.getPrimary(); - const queryDoc = primary.getCollection(kSampledQueriesNs).findOne(filter); + const queryDocs = primary.getCollection(kSampledQueriesNs).find(filter).toArray(); if (shardName) { - assert.eq(queryDoc, - null, + assert.eq(queryDocs.length, + 0, "Found a sampled query on more than one shard " + tojson({shardNames: [shardName, rs.test.name], cmdName, cmdObj})); continue; - } else if (queryDoc) { + } else if (queryDocs.length > 0) { + assert.eq(queryDocs.length, 1, queryDocs); + const queryDoc = queryDocs[0]; + shardName = rs.test.name; assert(shardNames.includes(shardName), "Found a sampled query on an unexpected shard " + @@ -170,6 +191,18 @@ var QuerySamplingUtil = (function() { assert.eq(coll.find({ns}).itcount(), 0); } + function clearSampledQueryCollection(primary) { + const coll = primary.getCollection(kSampledQueriesNs); + assert.commandWorked(coll.remove({})); + } + + function clearSampledQueryCollectionOnAllShards(st) { + for (let rs of st._rs) { + const primary = rs.test.getPrimary(); + clearSampledQueryCollection(primary); + } + } + /** * Waits for the config.sampledQueriesDiff collection to have a document with _id equal to * 'sampleId' for the collection 'ns', and then asserts that the diff in that document matches @@ -210,9 +243,11 @@ var QuerySamplingUtil = (function() { generateRandomCollation, makeCmdObjIgnoreSessionInfo, waitForActiveSampling, + waitForActiveSamplingOnAllShards, assertSoonSampledQueryDocuments, assertSoonSampledQueryDocumentsAcrossShards, assertNoSampledQueryDocuments, + clearSampledQueryCollectionOnAllShards, assertSoonSingleSampledDiffDocument, assertNoSampledDiffDocuments, clearSampledDiffCollection diff --git a/jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js b/jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js new file mode 100644 index 00000000000..38a39b3ed2e --- /dev/null +++ b/jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js @@ -0,0 +1,305 @@ +/** + * Utilities for testing basic support for sampling nested aggregate queries (i.e. ones inside + * $lookup, $graphLookup, $unionWith) on a sharded cluster. + */ + +load("jstests/libs/sbe_util.js"); // For checkSBEEnabled. + +// Make the periodic jobs for refreshing sample rates and writing sampled queries and diffs have a +// period of 1 second to speed up the test. +const queryAnalysisSamplerConfigurationRefreshSecs = 1; +const queryAnalysisWriterIntervalSecs = 1; + +const outerAggTestCases = [ + // The test cases for singly-nested aggregate queries. + { + name: "lookup_custom_pipeline", + supportCustomPipeline: true, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [{$lookup: {from: foreignCollName, as: "joined", pipeline}}]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => true + }, + { + name: "lookup_non_custom_pipeline", + supportCustomPipeline: false, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [ + {$lookup: {from: foreignCollName, as: "joined", localField: "a", foreignField: "x"}} + ]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => { + // When SBE is enabled, if the collection is not sharded and not clustered, the shard + // will not create a separate pipeline to execute the inner side of a $lookup stage so + // there is no nested aggregate query to route. + const listCollectionRes = + assert.commandWorked(db.runCommand({listCollections: 1, filter: {name: collName}})); + const isClusteredColl = + listCollectionRes.cursor.firstBatch[0].options.hasOwnProperty("clusteredIndex"); + const isEligibleForSBELookupPushdown = + checkSBEEnabled(db) && !isShardedColl && !isClusteredColl; + return !isEligibleForSBELookupPushdown; + } + }, + { + name: "unionWith", + supportCustomPipeline: true, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [{$unionWith: {coll: foreignCollName, pipeline}}]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => true + }, + { + name: "graphLookup", + supportCustomPipeline: false, + makeOuterPipelineFunc: (localCollName, foreignCollName) => { + return [{ + $graphLookup: { + from: foreignCollName, + startWith: "$x", + connectFromField: "x", + connectToField: "a", + maxDepth: 1, + as: "connected" + } + }]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => true + }, + // The test cases for doubly-nested aggregate queries. + { + name: "lookup+lookup", + supportCustomPipeline: true, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [{ + $lookup: { + from: localCollName, + as: "joined", + pipeline: [{ + $lookup: { + from: foreignCollName, + as: "joined", + pipeline + } + }] + } + }]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => true + }, + { + name: "lookup+unionWith", + supportCustomPipeline: true, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [{ + $lookup: { + from: localCollName, + as: "joined", + pipeline: [{ + $unionWith: { + coll: foreignCollName, + pipeline + } + }] + } + }]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => true + }, + { + name: "lookup+graphLookUp", + supportCustomPipeline: false, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [{ + $lookup: { + from: localCollName, + as: "joined", + pipeline: [{ + $graphLookup: { + from: foreignCollName, + startWith: "$x", + connectFromField: "x", + connectToField: "a", + maxDepth: 1, + as: "connected" + } + }] + } + }]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => true + }, + { + name: "unionWith+lookup", + supportCustomPipeline: true, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [{ + $unionWith: { + coll: localCollName, + pipeline: [{$lookup: {from: foreignCollName, as: "joined", pipeline}}] + } + }]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => true + }, + { + name: "unionWith+unionWith", + supportCustomPipeline: true, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [{ + $unionWith: { + coll: localCollName, + pipeline: [{$unionWith: {coll: foreignCollName, pipeline}}] + } + }]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => true + }, + { + name: "unionWith+graphLookup", + supportCustomPipeline: false, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [{ + $unionWith: { + coll: localCollName, + pipeline: [{ + $graphLookup: { + from: foreignCollName, + startWith: "$x", + connectFromField: "x", + connectToField: "a", + maxDepth: 1, + as: "connected" + } + }] + } + }]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => true, + }, + { + name: "facet", + supportCustomPipeline: false, + makeOuterPipelineFunc: (localCollName, foreignCollName, pipeline) => { + return [{$facet: {foo: [{$match: {}}]}}]; + }, + requireShardToRouteFunc: (db, collName, isShardedColl) => false, + } +]; + +const innerAggTestCases = [ + { + // The filter is in the first stage. + containInitialFilter: true, + makeInnerPipelineFunc: (filter) => { + return [{$match: filter}]; + } + }, + { + // The filter is not in the first stage but the stage that it is in is moveable. + containInitialFilter: true, + makeInnerPipelineFunc: (filter) => { + return [{$sort: {x: -1}}, {$match: filter}]; + } + }, + { + // The filter is not in the first stage and the stage that it is in is not moveable. + containInitialFilter: false, + makeInnerPipelineFunc: (filter) => { + return [{$_internalInhibitOptimization: {}}, {$match: filter}]; + } + } +]; + +/** + * Tests that a nested aggregate query run internally by an aggregation stage that takes in a + * "pipeline" is sampled correctly. + */ +function testCustomInnerPipeline(makeOuterPipelineFunc, + makeInnerPipelineFunc, + containInitialFilter, + st, + dbName, + localCollName, + foreignCollName, + filter, + shardNames, + explain, + requireShardToRoute) { + const mongosDB = st.s.getDB(dbName); + const foreignNs = dbName + "." + foreignCollName; + const foreignCollUuid = QuerySamplingUtil.getCollectionUuid(mongosDB, foreignCollName); + + let expectedSampledQueryDocs = []; + + const collation = QuerySamplingUtil.generateRandomCollation(); + const innerPipeline = makeInnerPipelineFunc(filter); + const outerPipeline = makeOuterPipelineFunc(localCollName, foreignCollName, innerPipeline); + const originalCmdObj = + {aggregate: localCollName, pipeline: outerPipeline, collation, cursor: {}}; + + jsTest.log("Testing command " + tojsononeline({originalCmdObj, explain})); + assert.commandWorked(mongosDB.runCommand(explain ? {explain: originalCmdObj} : originalCmdObj)); + + // Queries that are part of an 'explain' or that do not require shards to route should not get + // sampled. + if (!explain && requireShardToRoute) { + const expectedFilter = containInitialFilter ? filter : {}; + const expectedDoc = { + cmdName: "aggregate", + cmdObj: {filter: expectedFilter, collation}, + shardNames + }; + if (filter) { + expectedDoc.filter = {"cmd.filter": expectedFilter}; + } + expectedSampledQueryDocs.push(expectedDoc); + } + + sleep(queryAnalysisWriterIntervalSecs * 1000); + QuerySamplingUtil.assertSoonSampledQueryDocumentsAcrossShards( + st, foreignNs, foreignCollUuid, ["aggregate"], expectedSampledQueryDocs); + QuerySamplingUtil.clearSampledQueryCollectionOnAllShards(st); +} + +/** + * Tests that a nested aggregate query run internally by an aggregation stage that does not take in + * a "pipeline" is sampled correctly. + */ +function testNoCustomInnerPipeline(makeOuterPipelineFunc, + st, + dbName, + localCollName, + foreignCollName, + explain, + requireShardToRoute) { + const mongosDB = st.s.getDB(dbName); + const foreignNs = dbName + "." + foreignCollName; + const foreignCollUuid = QuerySamplingUtil.getCollectionUuid(mongosDB, foreignCollName); + + let expectedSampledQueryDocs = []; + + const collation = QuerySamplingUtil.generateRandomCollation(); + const outerPipeline = makeOuterPipelineFunc(localCollName, foreignCollName); + const originalCmdObj = + {aggregate: localCollName, pipeline: outerPipeline, collation, cursor: {}}; + + jsTest.log("Testing command " + tojsononeline({originalCmdObj, explain})); + assert.commandWorked(mongosDB.runCommand(explain ? {explain: originalCmdObj} : originalCmdObj)); + + // Queries that are part of an 'explain' or that do not require shards to route should not get + // sampled. + if (!explain && requireShardToRoute) { + // Out of the aggregation stages above, the only stages that doesn't take in a custom + // pipeline are $graphLookup and $lookup (without a "pipeline" field). To avoid relying on + // the current format of the $match filter that they internally construct, skip verifying + // the filter and only verify that the query is present in the config.sampledQueries + // collection. + expectedSampledQueryDocs.push({}); + } + + sleep(queryAnalysisWriterIntervalSecs * 1000); + QuerySamplingUtil.assertSoonSampledQueryDocumentsAcrossShards( + st, foreignNs, foreignCollUuid, ["aggregate"], expectedSampledQueryDocs); + QuerySamplingUtil.clearSampledQueryCollectionOnAllShards(st); +} diff --git a/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_sharded.js b/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_sharded.js new file mode 100644 index 00000000000..58efb7c3565 --- /dev/null +++ b/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_sharded.js @@ -0,0 +1,143 @@ +/** + * Tests basic support for sampling nested aggregate queries (i.e. ones inside $lookup, + * $graphLookup, $unionWith) against a sharded collection on a sharded cluster. + * + * @tags: [requires_fcv_62, featureFlagAnalyzeShardKey] + */ +(function() { +"use strict"; + +load("jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js"); +load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js"); + +const st = new ShardingTest({ + shards: 3, + rs: { + nodes: 2, + setParameter: + {queryAnalysisSamplerConfigurationRefreshSecs, queryAnalysisWriterIntervalSecs} + }, + // Disable query sampling on mongos to verify that the nested aggregate queries are sampled by + // the shard that routes them. + mongosOptions: + {setParameter: {"failpoint.disableQueryAnalysisSampler": tojson({mode: "alwaysOn"})}} +}); + +const dbName = "testDb"; +const localCollName = "testLocalColl"; +const foreignCollName = "testForeignColl"; +const foreignNs = dbName + "." + foreignCollName; +const mongosDB = st.s.getDB(dbName); + +// Set up the local collection. It needs to have at least one document. Otherwise, no nested +// aggregate queries would be issued. +assert.commandWorked(mongosDB.getCollection(localCollName).insert([{a: 0}])); + +// Set up the foreign collection. Make it have three chunks: +// shard0: [MinKey, 0] +// shard1: [0, 1000] +// shard1: [1000, MaxKey] +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +st.ensurePrimaryShard(dbName, st.shard0.name); +assert.commandWorked(st.s.adminCommand({shardCollection: foreignNs, key: {x: 1}})); +assert.commandWorked(st.s.adminCommand({split: foreignNs, middle: {x: 0}})); +assert.commandWorked(st.s.adminCommand({split: foreignNs, middle: {x: 1000}})); +assert.commandWorked( + st.s.adminCommand({moveChunk: foreignNs, find: {x: 0}, to: st.shard1.shardName})); +assert.commandWorked( + st.s.adminCommand({moveChunk: foreignNs, find: {x: 1000}, to: st.shard2.name})); + +assert.commandWorked( + st.s.adminCommand({configureQueryAnalyzer: foreignNs, mode: "full", sampleRate: 1000})); +QuerySamplingUtil.waitForActiveSamplingOnAllShards(st); + +for (let {name, + makeOuterPipelineFunc, + requireShardToRouteFunc, + supportCustomPipeline} of outerAggTestCases) { + const requireShardToRoute = + requireShardToRouteFunc(mongosDB, foreignCollName, true /* isShardedColl */); + if (supportCustomPipeline) { + for (let {makeInnerPipelineFunc, containInitialFilter} of innerAggTestCases) { + const filter0 = {x: 1, name}; + // If the aggregation doesn't have an initial filter, it would be routed to all shards. + const shardNames0 = + containInitialFilter ? [st.rs1.name] : [st.rs0.name, st.rs1.name, st.rs2.name]; + testCustomInnerPipeline(makeOuterPipelineFunc, + makeInnerPipelineFunc, + containInitialFilter, + st, + dbName, + localCollName, + foreignCollName, + filter0, + shardNames0, + false /* explain */, + requireShardToRoute); + + const filter1 = {x: {$gte: 2}, name}; + // If the aggregation doesn't have an initial filter, it would be routed to all shards. + const shardNames1 = containInitialFilter ? [st.rs1.name, st.rs2.name] + : [st.rs0.name, st.rs1.name, st.rs2.name]; + testCustomInnerPipeline(makeOuterPipelineFunc, + makeInnerPipelineFunc, + containInitialFilter, + st, + dbName, + localCollName, + foreignCollName, + filter1, + shardNames1, + false /* explain */, + requireShardToRoute); + + const filter2 = {x: 3, name}; + const shardNames2 = []; + testCustomInnerPipeline(makeOuterPipelineFunc, + makeInnerPipelineFunc, + containInitialFilter, + st, + dbName, + localCollName, + foreignCollName, + filter2, + shardNames2, + true /* explain */, + requireShardToRoute); + + const filter3 = {x: {$gte: 4}, name}; + const shardNames3 = []; + testCustomInnerPipeline(makeOuterPipelineFunc, + makeInnerPipelineFunc, + containInitialFilter, + st, + dbName, + localCollName, + foreignCollName, + filter3, + shardNames3, + true /* explain */, + requireShardToRoute); + } + } else { + testNoCustomInnerPipeline(makeOuterPipelineFunc, + st, + dbName, + localCollName, + foreignCollName, + false /* explain */, + requireShardToRoute); + testNoCustomInnerPipeline(makeOuterPipelineFunc, + st, + dbName, + localCollName, + foreignCollName, + true /* explain */, + requireShardToRoute); + } +} + +assert.commandWorked(st.s.adminCommand({configureQueryAnalyzer: foreignNs, mode: "off"})); + +st.stop(); +})(); diff --git a/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_unsharded.js b/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_unsharded.js new file mode 100644 index 00000000000..5b4626a12f1 --- /dev/null +++ b/jstests/sharding/analyze_shard_key/sample_nested_agg_queries_unsharded.js @@ -0,0 +1,103 @@ +/** + * Tests basic support for sampling nested aggregate queries (i.e. inside $lookup, $graphLookup, + * $unionWith) against an unsharded collection on a sharded cluster. + * + * @tags: [requires_fcv_62, featureFlagAnalyzeShardKey] + */ +(function() { +"use strict"; + +load("jstests/sharding/analyze_shard_key/libs/sample_nested_agg_queries_common.js"); +load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js"); + +const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 2, + setParameter: + {queryAnalysisSamplerConfigurationRefreshSecs, queryAnalysisWriterIntervalSecs} + }, + // Disable query sampling on mongos to verify that the nested aggregate queries are sampled by + // the shard that routes them. + mongosOptions: + {setParameter: {"failpoint.disableQueryAnalysisSampler": tojson({mode: "alwaysOn"})}} +}); + +const dbName = "testDb"; +const localCollName = "testLocalColl"; +const foreignCollName = "testForeignColl"; +const foreignNs = dbName + "." + foreignCollName; +const mongosDB = st.s.getDB(dbName); + +// Set up the local collection. It needs to have at least one document. Otherwise, no nested +// aggregate queries will be issued. +assert.commandWorked(mongosDB.getCollection(localCollName).insert([{a: 0}])); + +// Set up the foreign collection. +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +st.ensurePrimaryShard(dbName, st.shard0.name); +assert.commandWorked(mongosDB.createCollection(foreignCollName)); + +assert.commandWorked( + st.s.adminCommand({configureQueryAnalyzer: foreignNs, mode: "full", sampleRate: 1000})); +QuerySamplingUtil.waitForActiveSamplingOnAllShards(st); + +// The foreign collection is unsharded so all documents are on the primary shard. +const shardNames = [st.rs0.name]; + +for (let {name, + makeOuterPipelineFunc, + requireShardToRouteFunc, + supportCustomPipeline} of outerAggTestCases) { + const requireShardToRoute = + requireShardToRouteFunc(mongosDB, foreignCollName, false /* isShardedColl */); + if (supportCustomPipeline) { + for (let {makeInnerPipelineFunc, containInitialFilter} of innerAggTestCases) { + const filter0 = {x: 1, name}; + testCustomInnerPipeline(makeOuterPipelineFunc, + makeInnerPipelineFunc, + containInitialFilter, + st, + dbName, + localCollName, + foreignCollName, + filter0, + shardNames, + false /* explain */, + requireShardToRoute); + + const filter1 = {x: 2, name}; + testCustomInnerPipeline(makeOuterPipelineFunc, + makeInnerPipelineFunc, + containInitialFilter, + st, + dbName, + localCollName, + foreignCollName, + filter1, + shardNames, + true /* explain */, + requireShardToRoute); + } + } else { + testNoCustomInnerPipeline(makeOuterPipelineFunc, + st, + dbName, + localCollName, + foreignCollName, + false /* explain */, + requireShardToRoute); + testNoCustomInnerPipeline(makeOuterPipelineFunc, + st, + dbName, + localCollName, + foreignCollName, + true /* explain */, + requireShardToRoute); + } +} + +assert.commandWorked(st.s.adminCommand({configureQueryAnalyzer: foreignNs, mode: "off"})); + +st.stop(); +})(); diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 78761b412bd..dd29e4816a9 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -46,6 +46,7 @@ #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/sort_reorder_helpers.h" #include "mongo/db/query/query_knobs_gen.h" +#include "mongo/db/stats/counters.h" #include "mongo/db/views/resolved_view.h" #include "mongo/logv2/log.h" @@ -654,6 +655,10 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( _unwind(unwindSrc), _variables(expCtx->variables), _variablesParseState(expCtx->variablesParseState.copyWith(_variables.useIdGenerator())) { + if (!_from.isOnInternalDb()) { + globalOpCounters.gotNestedAggregate(); + } + const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_from); _fromExpCtx = pExpCtx->copyForSubPipeline(resolvedNamespace.ns, resolvedNamespace.uuid); _fromExpCtx->inLookup = true; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 852ce00d519..54189a47166 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/pipeline/document_source_graph_lookup.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" +#include "mongo/db/stats/counters.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" @@ -714,6 +715,46 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldNotExpandArraysWithinArraysAtEndOfCo ASSERT(graphLookupStage->getNext().isEOF()); } +TEST_F(DocumentSourceGraphLookUpTest, IncrementNestedAggregateOpCounterOnCreateButNotOnCopy) { + auto testOpCounter = [&](const NamespaceString& nss, const int expectedIncrease) { + auto resolvedNss = StringMap{ + {nss.coll().toString(), {nss, std::vector()}}}; + auto countBeforeCreate = globalOpCounters.getNestedAggregate()->load(); + + // Create a DocumentSourceGraphLookUp and verify that the counter increases by the expected + // amount. + auto originalExpCtx = make_intrusive(getOpCtx(), nss); + originalExpCtx->setResolvedNamespaces(resolvedNss); + auto docSource = DocumentSourceGraphLookUp::createFromBson( + BSON("$graphLookup" << BSON("from" << nss.coll() << "startWith" + << "$x" + << "connectFromField" + << "id" + << "connectToField" + << "id" + << "as" + << "connections")) + .firstElement(), + originalExpCtx); + auto originalGraphLookup = static_cast(docSource.get()); + auto countAfterCreate = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCreate - countBeforeCreate, expectedIncrease); + + // Copy the DocumentSourceGraphLookUp and verify that the counter doesn't increase. + auto newExpCtx = make_intrusive(getOpCtx(), nss); + newExpCtx->setResolvedNamespaces(resolvedNss); + DocumentSourceGraphLookUp newGraphLookup{*originalGraphLookup, newExpCtx}; + auto countAfterCopy = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCopy - countAfterCreate, 0); + }; + + testOpCounter(NamespaceString{"testDb", "testColl"}, 1); + // $graphLookup against internal databases should not cause the counter to get incremented. + testOpCounter(NamespaceString{"config", "testColl"}, 0); + testOpCounter(NamespaceString{"admin", "testColl"}, 0); + testOpCounter(NamespaceString{"local", "testColl"}, 0); +} + using DocumentSourceUnionWithServerlessTest = ServerlessAggregationContextFixture; diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 20253fec682..6ee70cefcb0 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -47,6 +47,7 @@ #include "mongo/db/pipeline/variable_validation.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/query_knobs_gen.h" +#include "mongo/db/stats/counters.h" #include "mongo/db/views/resolved_view.h" #include "mongo/logv2/log.h" #include "mongo/platform/overflow_arithmetic.h" @@ -133,6 +134,10 @@ DocumentSourceLookUp::DocumentSourceLookUp( _as(std::move(as)), _variables(expCtx->variables), _variablesParseState(expCtx->variablesParseState.copyWith(_variables.useIdGenerator())) { + if (!_fromNs.isOnInternalDb()) { + globalOpCounters.gotNestedAggregate(); + } + const auto& resolvedNamespace = expCtx->getResolvedNamespace(_fromNs); _resolvedNs = resolvedNamespace.ns; _resolvedPipeline = resolvedNamespace.pipeline; diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 0fcad6cbb37..05b783f9ebb 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -49,6 +49,7 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/server_options.h" +#include "mongo/db/stats/counters.h" namespace mongo { namespace { @@ -1424,6 +1425,41 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); } +TEST_F(DocumentSourceLookUpTest, IncrementNestedAggregateOpCounterOnCreateButNotOnCopy) { + auto testOpCounter = [&](const NamespaceString& nss, const int expectedIncrease) { + auto resolvedNss = StringMap{ + {nss.coll().toString(), {nss, std::vector()}}}; + auto countBeforeCreate = globalOpCounters.getNestedAggregate()->load(); + + // Create a DocumentSourceLookUp and verify that the counter increases by the expected + // amount. + auto originalExpCtx = make_intrusive(getOpCtx(), nss); + originalExpCtx->setResolvedNamespaces(resolvedNss); + auto docSource = DocumentSourceLookUp::createFromBson( + BSON("$lookup" << BSON("from" << nss.coll() << "pipeline" + << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) << "as" + << "as")) + .firstElement(), + originalExpCtx); + auto originalLookup = static_cast(docSource.get()); + auto countAfterCreate = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCreate - countBeforeCreate, expectedIncrease); + + // Copy the DocumentSourceLookUp and verify that the counter doesn't increase. + auto newExpCtx = make_intrusive(getOpCtx(), nss); + newExpCtx->setResolvedNamespaces(resolvedNss); + DocumentSourceLookUp newLookup{*originalLookup, newExpCtx}; + auto countAfterCopy = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCopy - countAfterCreate, 0); + }; + + testOpCounter(NamespaceString{"testDb", "testColl"}, 1); + // $lookup against internal databases should not cause the counter to get incremented. + testOpCounter(NamespaceString{"config", "testColl"}, 0); + testOpCounter(NamespaceString{"admin", "testColl"}, 0); + testOpCounter(NamespaceString{"local", "testColl"}, 0); +} + using DocumentSourceLookUpServerlessTest = ServerlessAggregationContextFixture; TEST_F(DocumentSourceLookUpServerlessTest, diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h index c9ac829c7a0..ab57f8f7cf2 100644 --- a/src/mongo/db/pipeline/document_source_union_with.h +++ b/src/mongo/db/pipeline/document_source_union_with.h @@ -35,6 +35,7 @@ #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/stage_constraints.h" +#include "mongo/db/stats/counters.h" namespace mongo { @@ -63,6 +64,11 @@ public: DocumentSourceUnionWith(const boost::intrusive_ptr& expCtx, std::unique_ptr pipeline) : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) { + if (!_pipeline->getContext()->ns.isOnInternalDb()) { + globalOpCounters.gotNestedAggregate(); + } + _pipeline->getContext()->inUnionWith = true; + // If this pipeline is being run as part of explain, then cache a copy to use later during // serialization. if (expCtx->explain >= ExplainOptions::Verbosity::kExecStats) { @@ -74,7 +80,9 @@ public: const boost::intrusive_ptr& newExpCtx) : DocumentSource(kStageName, newExpCtx ? newExpCtx : original.pExpCtx->copyWith(original.pExpCtx->ns)), - _pipeline(original._pipeline->clone()) {} + _pipeline(original._pipeline->clone()) { + _pipeline->getContext()->inUnionWith = true; + } ~DocumentSourceUnionWith(); diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp index 05e0feb7baa..50f9047bd0d 100644 --- a/src/mongo/db/pipeline/document_source_union_with_test.cpp +++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp @@ -588,6 +588,40 @@ TEST_F(DocumentSourceUnionWithTest, StricterConstraintsFromSubSubPipelineAreInhe ASSERT_TRUE(unionStage.constraints(Pipeline::SplitState::kUnsplit) == expectedConstraints); } +TEST_F(DocumentSourceUnionWithTest, IncrementNestedAggregateOpCounterOnCreateButNotOnCopy) { + auto testOpCounter = [&](const NamespaceString& nss, const int expectedIncrease) { + auto resolvedNss = StringMap{ + {nss.coll().toString(), {nss, std::vector()}}}; + auto countBeforeCreate = globalOpCounters.getNestedAggregate()->load(); + + // Create a DocumentSourceUnionWith and verify that the counter increases by the expected + // amount. + auto originalExpCtx = make_intrusive(getOpCtx(), nss); + originalExpCtx->setResolvedNamespaces(resolvedNss); + auto docSource = DocumentSourceUnionWith::createFromBson( + BSON("$unionWith" << BSON("coll" << nss.coll() << "pipeline" + << BSON_ARRAY(BSON("$match" << BSON("x" << 1))))) + .firstElement(), + originalExpCtx); + auto originalUnionWith = static_cast(docSource.get()); + auto countAfterCreate = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCreate - countBeforeCreate, expectedIncrease); + + // Copy the DocumentSourceUnionWith and verify that the counter doesn't increase. + auto newExpCtx = make_intrusive(getOpCtx(), nss); + newExpCtx->setResolvedNamespaces(resolvedNss); + DocumentSourceUnionWith newUnionWith{*originalUnionWith, newExpCtx}; + auto countAfterCopy = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCopy - countAfterCreate, 0); + }; + + testOpCounter(NamespaceString{"testDb", "testColl"}, 1); + // $unionWith against internal databases should not cause the counter to get incremented. + testOpCounter(NamespaceString{"config", "testColl"}, 0); + testOpCounter(NamespaceString{"admin", "testColl"}, 0); + testOpCounter(NamespaceString{"local", "testColl"}, 0); +} + using DocumentSourceUnionWithServerlessTest = ServerlessAggregationContextFixture; TEST_F(DocumentSourceUnionWithServerlessTest, diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 8629311709a..3ec3554a336 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -300,6 +300,14 @@ public: return tailableMode == TailableModeEnum::kTailableAndAwaitData; } + /** + * Returns true if the pipeline is eligible for query sampling. That is, it is not an explain + * and either it is not nested or it is nested inside $lookup, $graphLookup and $unionWith. + */ + bool eligibleForSampling() const { + return !explain && (subPipelineDepth == 0 || inLookup || inUnionWith); + } + void setResolvedNamespaces(StringMap resolvedNamespaces) { _resolvedNamespaces = std::move(resolvedNamespaces); } @@ -440,6 +448,9 @@ public: // True if this 'ExpressionContext' object is for the inner side of a $lookup or $graphLookup. bool inLookup = false; + // True if this 'ExpressionContext' object is for the inner side of a $unionWith. + bool inUnionWith = false; + // If set, this will disallow use of features introduced in versions above the provided version. boost::optional maxFeatureCompatibilityVersion; diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript index c590971845a..769d2476394 100644 --- a/src/mongo/db/pipeline/process_interface/SConscript +++ b/src/mongo/db/pipeline/process_interface/SConscript @@ -56,10 +56,12 @@ env.Library( '$BUILD_DIR/mongo/db/operation_time_tracker', '$BUILD_DIR/mongo/db/ops/write_ops', '$BUILD_DIR/mongo/db/repl/primary_only_service', + '$BUILD_DIR/mongo/db/s/query_analysis_writer', '$BUILD_DIR/mongo/db/server_feature_flags', '$BUILD_DIR/mongo/db/session/session_catalog', '$BUILD_DIR/mongo/db/stats/fill_locker_info', '$BUILD_DIR/mongo/db/storage/backup_cursor_hooks', + '$BUILD_DIR/mongo/s/query_analysis_sampler', '$BUILD_DIR/mongo/scripting/scripting_common', ], ) diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index 430378b2a41..62dbd96b5ab 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -59,6 +59,7 @@ #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/query/sbe_plan_cache.h" #include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/transaction_coordinator_curop.h" #include "mongo/db/s/transaction_coordinator_worker_curop_repository.h" @@ -74,6 +75,7 @@ #include "mongo/logv2/log.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/s/query_analysis_sampler_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -423,6 +425,15 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline* return pipeline; } + if (expCtx->eligibleForSampling()) { + if (auto sampleId = analyze_shard_key::tryGenerateSampleId(expCtx->opCtx, expCtx->ns)) { + analyze_shard_key::QueryAnalysisWriter::get(expCtx->opCtx) + .addAggregateQuery( + *sampleId, expCtx->ns, pipeline->getInitialQuery(), expCtx->getCollatorBSON()) + .getAsync([](auto) {}); + } + } + boost::optional autoColl; const NamespaceStringOrUUID nsOrUUID = expCtx->uuid ? NamespaceStringOrUUID{expCtx->ns.dbName(), *expCtx->uuid} : expCtx->ns; diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index e9d6a35c5ad..6345ed858ee 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -786,7 +786,7 @@ std::unique_ptr targetShardsAndAddMergeCursors( dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest), hasChangeStream, startsWithDocuments, - !aggRequest.getExplain() /* eligibleForSampling */, + expCtx->eligibleForSampling(), std::move(pipeline), shardTargetingPolicy, std::move(readConcern)); @@ -1495,7 +1495,7 @@ BSONObj targetShardsForExplain(Pipeline* ownedPipeline) { dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest), hasChangeStream, startsWithDocuments, - false /* eligibleForSampling */, + expCtx->eligibleForSampling(), std::move(pipeline)); BSONObjBuilder explainBuilder; auto appendStatus = diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index db9c79c432a..c5fdeb633be 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -43,15 +43,24 @@ env.Library( ], ) +env.Library( + target='query_analysis_sampler', + source=[ + 'query_analysis_sampler.cpp', + 'query_analysis_sampler_util.cpp', + 'query_analysis_server_status.cpp', + ], + LIBDEPS_PRIVATE=[ + 'grid', + ], +) + env.Library( target='sharding_router_api', source=[ 'cluster_commands_helpers.cpp', 'collection_uuid_mismatch.cpp', 'multi_statement_transaction_requests_sender.cpp', - 'query_analysis_sampler.cpp', - 'query_analysis_sampler_util.cpp', - 'query_analysis_server_status.cpp', 'router_transactions_metrics.cpp', 'router_transactions_stats.idl', 'router.cpp', @@ -70,6 +79,7 @@ env.Library( '$BUILD_DIR/mongo/db/shared_request_handling', 'async_requests_sender', 'grid', + 'query_analysis_sampler', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/collection_uuid_mismatch_info', diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index ed04f6592ff..0e26dd1b5a5 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -169,7 +169,6 @@ bool runAggregationMapReduce(OperationContext* opCtx, auto cm = uassertStatusOK( sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, parsedMr.getNamespace())); auto expCtx = makeExpressionContext(opCtx, parsedMr, cm, verbosity); - const bool eligibleForSampling = !expCtx->explain; const auto pipelineBuilder = [&]() { return map_reduce_common::translateFromMR(parsedMr, expCtx); @@ -202,15 +201,15 @@ bool runAggregationMapReduce(OperationContext* opCtx, // needed in the normal aggregation path. For this translation, though, we need to // build the pipeline to serialize and send to the primary shard. auto serialized = serializeToCommand(cmd, parsedMr, pipelineBuilder().get()); - uassertStatusOK( - cluster_aggregation_planner::runPipelineOnPrimaryShard(expCtx, - namespaces, - *targeter.cm, - verbosity, - std::move(serialized), - privileges, - eligibleForSampling, - &tempResults)); + uassertStatusOK(cluster_aggregation_planner::runPipelineOnPrimaryShard( + expCtx, + namespaces, + *targeter.cm, + verbosity, + std::move(serialized), + privileges, + expCtx->eligibleForSampling(), + &tempResults)); break; } @@ -239,7 +238,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, &tempResults, false /* hasChangeStream */, false /* startsWithDocuments */, - eligibleForSampling)); + expCtx->eligibleForSampling())); break; } -- cgit v1.2.1