diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.h | 3 |
2 files changed, 16 insertions, 3 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index a7ba134a672..45106690b43 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -816,13 +816,24 @@ BSONObj createPassthroughCommandForShard( boost::optional<ExplainOptions::Verbosity> explainVerbosity, Pipeline* pipeline, BSONObj collationObj, - boost::optional<BSONObj> readConcern) { + boost::optional<BSONObj> readConcern, + boost::optional<int> overrideBatchSize) { // Create the command for the shards. MutableDocument targetedCmd(serializedCommand); if (pipeline) { targetedCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline->serialize()); } + if (overrideBatchSize.has_value()) { + if (serializedCommand[AggregateCommandRequest::kCursorFieldName].missing()) { + targetedCmd[AggregateCommandRequest::kCursorFieldName] = + Value(DOC(SimpleCursorOptions::kBatchSizeFieldName << Value(*overrideBatchSize))); + } else { + targetedCmd[AggregateCommandRequest::kCursorFieldName] + [SimpleCursorOptions::kBatchSizeFieldName] = Value(*overrideBatchSize); + } + } + auto shardCommand = genericTransformForShards(std::move(targetedCmd), expCtx, explainVerbosity, @@ -987,7 +998,8 @@ DispatchShardPipelineResults dispatchShardPipeline( expCtx->explain, pipeline.get(), expCtx->getCollatorBSON(), - std::move(readConcern))); + std::move(readConcern), + boost::none)); // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the // config server in order to monitor for new shards. To guarantee that we do not miss any diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 45237dad4ce..0c721c2342c 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -134,7 +134,8 @@ BSONObj createPassthroughCommandForShard( boost::optional<ExplainOptions::Verbosity> explainVerbosity, Pipeline* pipeline, BSONObj collationObj, - boost::optional<BSONObj> readConcern); + boost::optional<BSONObj> readConcern, + boost::optional<int> overrideBatchSize); BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionContext>& expCtx, Document serializedCommand, |