summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_aggregation_planner.cpp
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-11-11 20:04:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-11 20:55:05 +0000
commitbaafd67539f65f9ab620f4d67431881013fa66b5 (patch)
treeef699833220ab64a1dd923e22321bca476390cdf /src/mongo/s/query/cluster_aggregation_planner.cpp
parentdf38314f471a57fd273a7c0d747dfc1aeb4feaea (diff)
downloadmongo-baafd67539f65f9ab620f4d67431881013fa66b5.tar.gz
SERVER-69801 Support sampling aggregate queries on sharded clusters
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp')
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp24
1 files changed, 20 insertions, 4 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index b4749d14329..b82eca67821 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -56,6 +56,7 @@
#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/s/query/router_stage_skip.h"
#include "mongo/s/query/store_possible_cursor.h"
+#include "mongo/s/query_analysis_sampler_util.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/s/transaction_router.h"
@@ -631,6 +632,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
boost::optional<ExplainOptions::Verbosity> explain,
Document serializedCommand,
const PrivilegeVector& privileges,
+ bool eligibleForSampling,
BSONObjBuilder* out) {
return runPipelineOnSpecificShardOnly(expCtx,
namespaces,
@@ -639,7 +641,8 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
serializedCommand,
privileges,
cm.dbPrimary(),
- false,
+ false /* forPerShardCursor */,
+ eligibleForSampling,
out);
}
@@ -680,11 +683,16 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
const PrivilegeVector& privileges,
BSONObjBuilder* result,
bool hasChangeStream,
- bool startsWithDocuments) {
+ bool startsWithDocuments,
+ bool eligibleForSampling) {
auto expCtx = targeter.pipeline->getContext();
// If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline(
- serializedCommand, hasChangeStream, startsWithDocuments, std::move(targeter.pipeline));
+ auto shardDispatchResults =
+ sharded_agg_helpers::dispatchShardPipeline(serializedCommand,
+ hasChangeStream,
+ startsWithDocuments,
+ eligibleForSampling,
+ std::move(targeter.pipeline));
// If the operation is an explain, then we verify that it succeeded on all targeted
// shards, write the results to the output builder, and return immediately.
@@ -794,6 +802,7 @@ Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionConte
const PrivilegeVector& privileges,
ShardId shardId,
bool forPerShardCursor,
+ bool eligibleForSampling,
BSONObjBuilder* out) {
auto opCtx = expCtx->opCtx;
@@ -826,6 +835,13 @@ Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionConte
cmdObj = appendDbVersionIfPresent(std::move(cmdObj), *dbVersion);
}
+ if (eligibleForSampling) {
+ if (auto sampleId =
+ analyze_shard_key::tryGenerateSampleId(opCtx, namespaces.executionNss)) {
+ cmdObj = analyze_shard_key::appendSampleId(std::move(cmdObj), std::move(*sampleId));
+ }
+ }
+
MultiStatementTransactionRequestsSender ars(
opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),