diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.h | 3 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 8 |
3 files changed, 23 insertions, 4 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index a7ba134a672..45106690b43 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -816,13 +816,24 @@ BSONObj createPassthroughCommandForShard( boost::optional<ExplainOptions::Verbosity> explainVerbosity, Pipeline* pipeline, BSONObj collationObj, - boost::optional<BSONObj> readConcern) { + boost::optional<BSONObj> readConcern, + boost::optional<int> overrideBatchSize) { // Create the command for the shards. MutableDocument targetedCmd(serializedCommand); if (pipeline) { targetedCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline->serialize()); } + if (overrideBatchSize.has_value()) { + if (serializedCommand[AggregateCommandRequest::kCursorFieldName].missing()) { + targetedCmd[AggregateCommandRequest::kCursorFieldName] = + Value(DOC(SimpleCursorOptions::kBatchSizeFieldName << Value(*overrideBatchSize))); + } else { + targetedCmd[AggregateCommandRequest::kCursorFieldName] + [SimpleCursorOptions::kBatchSizeFieldName] = Value(*overrideBatchSize); + } + } + auto shardCommand = genericTransformForShards(std::move(targetedCmd), expCtx, explainVerbosity, @@ -987,7 +998,8 @@ DispatchShardPipelineResults dispatchShardPipeline( expCtx->explain, pipeline.get(), expCtx->getCollatorBSON(), - std::move(readConcern))); + std::move(readConcern), + boost::none)); // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the // config server in order to monitor for new shards. To guarantee that we do not miss any diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 45237dad4ce..0c721c2342c 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -134,7 +134,8 @@ BSONObj createPassthroughCommandForShard( boost::optional<ExplainOptions::Verbosity> explainVerbosity, Pipeline* pipeline, BSONObj collationObj, - boost::optional<BSONObj> readConcern); + boost::optional<BSONObj> readConcern, + boost::optional<int> overrideBatchSize); BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionContext>& expCtx, Document serializedCommand, diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index a87ee78aa11..211715b0e1c 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -648,6 +648,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& explain, nullptr, /* pipeline */ BSONObj(), + boost::none, boost::none); const auto shardId = cm.dbPrimary(); @@ -856,10 +857,14 @@ Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionConte BSONObjBuilder* out) { auto opCtx = expCtx->opCtx; + boost::optional<int> overrideBatchSize; if (forPerShardCursor) { tassert(6273804, "Per shard cursors are supposed to pass fromMongos: false to shards", !expCtx->inMongos); + // By using an initial batchSize of zero all of the events will get returned through + // the getMore path and have metadata stripped out. + overrideBatchSize = 0; } // Format the command for the shard. This wraps the command as an explain if necessary, and @@ -869,7 +874,8 @@ Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionConte explain, nullptr, /* pipeline */ BSONObj(), - boost::none); + boost::none, + overrideBatchSize); if (!forPerShardCursor && shardId != ShardId::kConfigServerId) { cmdObj = appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()); |