diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-11-11 20:04:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-11 20:55:05 +0000 |
commit | baafd67539f65f9ab620f4d67431881013fa66b5 (patch) | |
tree | ef699833220ab64a1dd923e22321bca476390cdf /src/mongo/s/query/cluster_aggregation_planner.cpp | |
parent | df38314f471a57fd273a7c0d747dfc1aeb4feaea (diff) | |
download | mongo-baafd67539f65f9ab620f4d67431881013fa66b5.tar.gz |
SERVER-69801 Support sampling aggregate queries on sharded clusters
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index b4749d14329..b82eca67821 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -56,6 +56,7 @@ #include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/s/query/router_stage_skip.h" #include "mongo/s/query/store_possible_cursor.h" +#include "mongo/s/query_analysis_sampler_util.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/transaction_router.h" @@ -631,6 +632,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& boost::optional<ExplainOptions::Verbosity> explain, Document serializedCommand, const PrivilegeVector& privileges, + bool eligibleForSampling, BSONObjBuilder* out) { return runPipelineOnSpecificShardOnly(expCtx, namespaces, @@ -639,7 +641,8 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& serializedCommand, privileges, cm.dbPrimary(), - false, + false /* forPerShardCursor */, + eligibleForSampling, out); } @@ -680,11 +683,16 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, const PrivilegeVector& privileges, BSONObjBuilder* result, bool hasChangeStream, - bool startsWithDocuments) { + bool startsWithDocuments, + bool eligibleForSampling) { auto expCtx = targeter.pipeline->getContext(); // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline( - serializedCommand, hasChangeStream, startsWithDocuments, std::move(targeter.pipeline)); + auto shardDispatchResults = + sharded_agg_helpers::dispatchShardPipeline(serializedCommand, + hasChangeStream, + startsWithDocuments, + eligibleForSampling, + std::move(targeter.pipeline)); // If the operation is an explain, then we verify that it succeeded on all targeted // shards, write the results to the output builder, and return immediately. @@ -794,6 +802,7 @@ Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionConte const PrivilegeVector& privileges, ShardId shardId, bool forPerShardCursor, + bool eligibleForSampling, BSONObjBuilder* out) { auto opCtx = expCtx->opCtx; @@ -826,6 +835,13 @@ Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionConte cmdObj = appendDbVersionIfPresent(std::move(cmdObj), *dbVersion); } + if (eligibleForSampling) { + if (auto sampleId = + analyze_shard_key::tryGenerateSampleId(opCtx, namespaces.executionNss)) { + cmdObj = analyze_shard_key::appendSampleId(std::move(cmdObj), std::move(*sampleId)); + } + } + MultiStatementTransactionRequestsSender ars( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), |