diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup_test.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_test.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_union_with.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_union_with_test.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 16 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_agg.cpp | 21 |
12 files changed, 179 insertions, 17 deletions
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 78761b412bd..dd29e4816a9 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -46,6 +46,7 @@ #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/sort_reorder_helpers.h" #include "mongo/db/query/query_knobs_gen.h" +#include "mongo/db/stats/counters.h" #include "mongo/db/views/resolved_view.h" #include "mongo/logv2/log.h" @@ -654,6 +655,10 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( _unwind(unwindSrc), _variables(expCtx->variables), _variablesParseState(expCtx->variablesParseState.copyWith(_variables.useIdGenerator())) { + if (!_from.isOnInternalDb()) { + globalOpCounters.gotNestedAggregate(); + } + const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_from); _fromExpCtx = pExpCtx->copyForSubPipeline(resolvedNamespace.ns, resolvedNamespace.uuid); _fromExpCtx->inLookup = true; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 852ce00d519..54189a47166 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/pipeline/document_source_graph_lookup.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" +#include "mongo/db/stats/counters.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" @@ -714,6 +715,46 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldNotExpandArraysWithinArraysAtEndOfCo ASSERT(graphLookupStage->getNext().isEOF()); } +TEST_F(DocumentSourceGraphLookUpTest, IncrementNestedAggregateOpCounterOnCreateButNotOnCopy) { + auto testOpCounter = [&](const NamespaceString& nss, const int expectedIncrease) { + auto resolvedNss = StringMap<ExpressionContext::ResolvedNamespace>{ + {nss.coll().toString(), {nss, std::vector<BSONObj>()}}}; + auto countBeforeCreate = globalOpCounters.getNestedAggregate()->load(); + + // Create a DocumentSourceGraphLookUp and verify that the counter increases by the expected + // amount. + auto originalExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss); + originalExpCtx->setResolvedNamespaces(resolvedNss); + auto docSource = DocumentSourceGraphLookUp::createFromBson( + BSON("$graphLookup" << BSON("from" << nss.coll() << "startWith" + << "$x" + << "connectFromField" + << "id" + << "connectToField" + << "id" + << "as" + << "connections")) + .firstElement(), + originalExpCtx); + auto originalGraphLookup = static_cast<DocumentSourceGraphLookUp*>(docSource.get()); + auto countAfterCreate = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCreate - countBeforeCreate, expectedIncrease); + + // Copy the DocumentSourceGraphLookUp and verify that the counter doesn't increase. + auto newExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss); + newExpCtx->setResolvedNamespaces(resolvedNss); + DocumentSourceGraphLookUp newGraphLookup{*originalGraphLookup, newExpCtx}; + auto countAfterCopy = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCopy - countAfterCreate, 0); + }; + + testOpCounter(NamespaceString{"testDb", "testColl"}, 1); + // $graphLookup against internal databases should not cause the counter to get incremented. + testOpCounter(NamespaceString{"config", "testColl"}, 0); + testOpCounter(NamespaceString{"admin", "testColl"}, 0); + testOpCounter(NamespaceString{"local", "testColl"}, 0); +} + using DocumentSourceUnionWithServerlessTest = ServerlessAggregationContextFixture; diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 20253fec682..6ee70cefcb0 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -47,6 +47,7 @@ #include "mongo/db/pipeline/variable_validation.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/query_knobs_gen.h" +#include "mongo/db/stats/counters.h" #include "mongo/db/views/resolved_view.h" #include "mongo/logv2/log.h" #include "mongo/platform/overflow_arithmetic.h" @@ -133,6 +134,10 @@ DocumentSourceLookUp::DocumentSourceLookUp( _as(std::move(as)), _variables(expCtx->variables), _variablesParseState(expCtx->variablesParseState.copyWith(_variables.useIdGenerator())) { + if (!_fromNs.isOnInternalDb()) { + globalOpCounters.gotNestedAggregate(); + } + const auto& resolvedNamespace = expCtx->getResolvedNamespace(_fromNs); _resolvedNs = resolvedNamespace.ns; _resolvedPipeline = resolvedNamespace.pipeline; diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 0fcad6cbb37..05b783f9ebb 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -49,6 +49,7 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/server_options.h" +#include "mongo/db/stats/counters.h" namespace mongo { namespace { @@ -1424,6 +1425,41 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); } +TEST_F(DocumentSourceLookUpTest, IncrementNestedAggregateOpCounterOnCreateButNotOnCopy) { + auto testOpCounter = [&](const NamespaceString& nss, const int expectedIncrease) { + auto resolvedNss = StringMap<ExpressionContext::ResolvedNamespace>{ + {nss.coll().toString(), {nss, std::vector<BSONObj>()}}}; + auto countBeforeCreate = globalOpCounters.getNestedAggregate()->load(); + + // Create a DocumentSourceLookUp and verify that the counter increases by the expected + // amount. + auto originalExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss); + originalExpCtx->setResolvedNamespaces(resolvedNss); + auto docSource = DocumentSourceLookUp::createFromBson( + BSON("$lookup" << BSON("from" << nss.coll() << "pipeline" + << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) << "as" + << "as")) + .firstElement(), + originalExpCtx); + auto originalLookup = static_cast<DocumentSourceLookUp*>(docSource.get()); + auto countAfterCreate = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCreate - countBeforeCreate, expectedIncrease); + + // Copy the DocumentSourceLookUp and verify that the counter doesn't increase. + auto newExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss); + newExpCtx->setResolvedNamespaces(resolvedNss); + DocumentSourceLookUp newLookup{*originalLookup, newExpCtx}; + auto countAfterCopy = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCopy - countAfterCreate, 0); + }; + + testOpCounter(NamespaceString{"testDb", "testColl"}, 1); + // $lookup against internal databases should not cause the counter to get incremented. + testOpCounter(NamespaceString{"config", "testColl"}, 0); + testOpCounter(NamespaceString{"admin", "testColl"}, 0); + testOpCounter(NamespaceString{"local", "testColl"}, 0); +} + using DocumentSourceLookUpServerlessTest = ServerlessAggregationContextFixture; TEST_F(DocumentSourceLookUpServerlessTest, diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h index c9ac829c7a0..ab57f8f7cf2 100644 --- a/src/mongo/db/pipeline/document_source_union_with.h +++ b/src/mongo/db/pipeline/document_source_union_with.h @@ -35,6 +35,7 @@ #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/stage_constraints.h" +#include "mongo/db/stats/counters.h" namespace mongo { @@ -63,6 +64,11 @@ public: DocumentSourceUnionWith(const boost::intrusive_ptr<ExpressionContext>& expCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline) : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) { + if (!_pipeline->getContext()->ns.isOnInternalDb()) { + globalOpCounters.gotNestedAggregate(); + } + _pipeline->getContext()->inUnionWith = true; + // If this pipeline is being run as part of explain, then cache a copy to use later during // serialization. if (expCtx->explain >= ExplainOptions::Verbosity::kExecStats) { @@ -74,7 +80,9 @@ public: const boost::intrusive_ptr<ExpressionContext>& newExpCtx) : DocumentSource(kStageName, newExpCtx ? newExpCtx : original.pExpCtx->copyWith(original.pExpCtx->ns)), - _pipeline(original._pipeline->clone()) {} + _pipeline(original._pipeline->clone()) { + _pipeline->getContext()->inUnionWith = true; + } ~DocumentSourceUnionWith(); diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp index 05e0feb7baa..50f9047bd0d 100644 --- a/src/mongo/db/pipeline/document_source_union_with_test.cpp +++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp @@ -588,6 +588,40 @@ TEST_F(DocumentSourceUnionWithTest, StricterConstraintsFromSubSubPipelineAreInhe ASSERT_TRUE(unionStage.constraints(Pipeline::SplitState::kUnsplit) == expectedConstraints); } +TEST_F(DocumentSourceUnionWithTest, IncrementNestedAggregateOpCounterOnCreateButNotOnCopy) { + auto testOpCounter = [&](const NamespaceString& nss, const int expectedIncrease) { + auto resolvedNss = StringMap<ExpressionContext::ResolvedNamespace>{ + {nss.coll().toString(), {nss, std::vector<BSONObj>()}}}; + auto countBeforeCreate = globalOpCounters.getNestedAggregate()->load(); + + // Create a DocumentSourceUnionWith and verify that the counter increases by the expected + // amount. + auto originalExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss); + originalExpCtx->setResolvedNamespaces(resolvedNss); + auto docSource = DocumentSourceUnionWith::createFromBson( + BSON("$unionWith" << BSON("coll" << nss.coll() << "pipeline" + << BSON_ARRAY(BSON("$match" << BSON("x" << 1))))) + .firstElement(), + originalExpCtx); + auto originalUnionWith = static_cast<DocumentSourceUnionWith*>(docSource.get()); + auto countAfterCreate = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCreate - countBeforeCreate, expectedIncrease); + + // Copy the DocumentSourceUnionWith and verify that the counter doesn't increase. + auto newExpCtx = make_intrusive<ExpressionContextForTest>(getOpCtx(), nss); + newExpCtx->setResolvedNamespaces(resolvedNss); + DocumentSourceUnionWith newUnionWith{*originalUnionWith, newExpCtx}; + auto countAfterCopy = globalOpCounters.getNestedAggregate()->load(); + ASSERT_EQ(countAfterCopy - countAfterCreate, 0); + }; + + testOpCounter(NamespaceString{"testDb", "testColl"}, 1); + // $unionWith against internal databases should not cause the counter to get incremented. + testOpCounter(NamespaceString{"config", "testColl"}, 0); + testOpCounter(NamespaceString{"admin", "testColl"}, 0); + testOpCounter(NamespaceString{"local", "testColl"}, 0); +} + using DocumentSourceUnionWithServerlessTest = ServerlessAggregationContextFixture; TEST_F(DocumentSourceUnionWithServerlessTest, diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 8629311709a..3ec3554a336 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -300,6 +300,14 @@ public: return tailableMode == TailableModeEnum::kTailableAndAwaitData; } + /** + * Returns true if the pipeline is eligible for query sampling. That is, it is not an explain + * and either it is not nested or it is nested inside $lookup, $graphLookup and $unionWith. + */ + bool eligibleForSampling() const { + return !explain && (subPipelineDepth == 0 || inLookup || inUnionWith); + } + void setResolvedNamespaces(StringMap<ResolvedNamespace> resolvedNamespaces) { _resolvedNamespaces = std::move(resolvedNamespaces); } @@ -440,6 +448,9 @@ public: // True if this 'ExpressionContext' object is for the inner side of a $lookup or $graphLookup. bool inLookup = false; + // True if this 'ExpressionContext' object is for the inner side of a $unionWith. + bool inUnionWith = false; + // If set, this will disallow use of features introduced in versions above the provided version. boost::optional<multiversion::FeatureCompatibilityVersion> maxFeatureCompatibilityVersion; diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript index c590971845a..769d2476394 100644 --- a/src/mongo/db/pipeline/process_interface/SConscript +++ b/src/mongo/db/pipeline/process_interface/SConscript @@ -56,10 +56,12 @@ env.Library( '$BUILD_DIR/mongo/db/operation_time_tracker', '$BUILD_DIR/mongo/db/ops/write_ops', '$BUILD_DIR/mongo/db/repl/primary_only_service', + '$BUILD_DIR/mongo/db/s/query_analysis_writer', '$BUILD_DIR/mongo/db/server_feature_flags', '$BUILD_DIR/mongo/db/session/session_catalog', '$BUILD_DIR/mongo/db/stats/fill_locker_info', '$BUILD_DIR/mongo/db/storage/backup_cursor_hooks', + '$BUILD_DIR/mongo/s/query_analysis_sampler', '$BUILD_DIR/mongo/scripting/scripting_common', ], ) diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index 430378b2a41..62dbd96b5ab 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -59,6 +59,7 @@ #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/query/sbe_plan_cache.h" #include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/transaction_coordinator_curop.h" #include "mongo/db/s/transaction_coordinator_worker_curop_repository.h" @@ -74,6 +75,7 @@ #include "mongo/logv2/log.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/s/query_analysis_sampler_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -423,6 +425,15 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline* return pipeline; } + if (expCtx->eligibleForSampling()) { + if (auto sampleId = analyze_shard_key::tryGenerateSampleId(expCtx->opCtx, expCtx->ns)) { + analyze_shard_key::QueryAnalysisWriter::get(expCtx->opCtx) + .addAggregateQuery( + *sampleId, expCtx->ns, pipeline->getInitialQuery(), expCtx->getCollatorBSON()) + .getAsync([](auto) {}); + } + } + boost::optional<AutoGetCollectionForReadCommandMaybeLockFree> autoColl; const NamespaceStringOrUUID nsOrUUID = expCtx->uuid ? NamespaceStringOrUUID{expCtx->ns.dbName(), *expCtx->uuid} : expCtx->ns; diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index e9d6a35c5ad..6345ed858ee 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -786,7 +786,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest), hasChangeStream, startsWithDocuments, - !aggRequest.getExplain() /* eligibleForSampling */, + expCtx->eligibleForSampling(), std::move(pipeline), shardTargetingPolicy, std::move(readConcern)); @@ -1495,7 +1495,7 @@ BSONObj targetShardsForExplain(Pipeline* ownedPipeline) { dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest), hasChangeStream, startsWithDocuments, - false /* eligibleForSampling */, + expCtx->eligibleForSampling(), std::move(pipeline)); BSONObjBuilder explainBuilder; auto appendStatus = diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index db9c79c432a..c5fdeb633be 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -44,14 +44,23 @@ env.Library( ) env.Library( + target='query_analysis_sampler', + source=[ + 'query_analysis_sampler.cpp', + 'query_analysis_sampler_util.cpp', + 'query_analysis_server_status.cpp', + ], + LIBDEPS_PRIVATE=[ + 'grid', + ], +) + +env.Library( target='sharding_router_api', source=[ 'cluster_commands_helpers.cpp', 'collection_uuid_mismatch.cpp', 'multi_statement_transaction_requests_sender.cpp', - 'query_analysis_sampler.cpp', - 'query_analysis_sampler_util.cpp', - 'query_analysis_server_status.cpp', 'router_transactions_metrics.cpp', 'router_transactions_stats.idl', 'router.cpp', @@ -70,6 +79,7 @@ env.Library( '$BUILD_DIR/mongo/db/shared_request_handling', 'async_requests_sender', 'grid', + 'query_analysis_sampler', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/collection_uuid_mismatch_info', diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index ed04f6592ff..0e26dd1b5a5 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -169,7 +169,6 @@ bool runAggregationMapReduce(OperationContext* opCtx, auto cm = uassertStatusOK( sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, parsedMr.getNamespace())); auto expCtx = makeExpressionContext(opCtx, parsedMr, cm, verbosity); - const bool eligibleForSampling = !expCtx->explain; const auto pipelineBuilder = [&]() { return map_reduce_common::translateFromMR(parsedMr, expCtx); @@ -202,15 +201,15 @@ bool runAggregationMapReduce(OperationContext* opCtx, // needed in the normal aggregation path. For this translation, though, we need to // build the pipeline to serialize and send to the primary shard. auto serialized = serializeToCommand(cmd, parsedMr, pipelineBuilder().get()); - uassertStatusOK( - cluster_aggregation_planner::runPipelineOnPrimaryShard(expCtx, - namespaces, - *targeter.cm, - verbosity, - std::move(serialized), - privileges, - eligibleForSampling, - &tempResults)); + uassertStatusOK(cluster_aggregation_planner::runPipelineOnPrimaryShard( + expCtx, + namespaces, + *targeter.cm, + verbosity, + std::move(serialized), + privileges, + expCtx->eligibleForSampling(), + &tempResults)); break; } @@ -239,7 +238,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, &tempResults, false /* hasChangeStream */, false /* startsWithDocuments */, - eligibleForSampling)); + expCtx->eligibleForSampling())); break; } |