summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-03-03 21:57:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-03 22:28:56 +0000
commitef2a62dcc27461d2be1b619c75bc04effa1f2021 (patch)
tree3f502ae6d6f8da7b79d16ced54157492c39700ca /src
parenta90eac974885febfc2499ca4c431e1d0ac20270e (diff)
downloadmongo-ef2a62dcc27461d2be1b619c75bc04effa1f2021.tar.gz
SERVER-63781 set per shard cursor initial batch size to 0
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp16
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h3
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp8
3 files changed, 23 insertions, 4 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index a7ba134a672..45106690b43 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -816,13 +816,24 @@ BSONObj createPassthroughCommandForShard(
boost::optional<ExplainOptions::Verbosity> explainVerbosity,
Pipeline* pipeline,
BSONObj collationObj,
- boost::optional<BSONObj> readConcern) {
+ boost::optional<BSONObj> readConcern,
+ boost::optional<int> overrideBatchSize) {
// Create the command for the shards.
MutableDocument targetedCmd(serializedCommand);
if (pipeline) {
targetedCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline->serialize());
}
+ if (overrideBatchSize.has_value()) {
+ if (serializedCommand[AggregateCommandRequest::kCursorFieldName].missing()) {
+ targetedCmd[AggregateCommandRequest::kCursorFieldName] =
+ Value(DOC(SimpleCursorOptions::kBatchSizeFieldName << Value(*overrideBatchSize)));
+ } else {
+ targetedCmd[AggregateCommandRequest::kCursorFieldName]
+ [SimpleCursorOptions::kBatchSizeFieldName] = Value(*overrideBatchSize);
+ }
+ }
+
auto shardCommand = genericTransformForShards(std::move(targetedCmd),
expCtx,
explainVerbosity,
@@ -987,7 +998,8 @@ DispatchShardPipelineResults dispatchShardPipeline(
expCtx->explain,
pipeline.get(),
expCtx->getCollatorBSON(),
- std::move(readConcern)));
+ std::move(readConcern),
+ boost::none));
// A $changeStream pipeline must run on all shards, and will also open an extra cursor on the
// config server in order to monitor for new shards. To guarantee that we do not miss any
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 45237dad4ce..0c721c2342c 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -134,7 +134,8 @@ BSONObj createPassthroughCommandForShard(
boost::optional<ExplainOptions::Verbosity> explainVerbosity,
Pipeline* pipeline,
BSONObj collationObj,
- boost::optional<BSONObj> readConcern);
+ boost::optional<BSONObj> readConcern,
+ boost::optional<int> overrideBatchSize);
BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Document serializedCommand,
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index a87ee78aa11..211715b0e1c 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -648,6 +648,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
explain,
nullptr, /* pipeline */
BSONObj(),
+ boost::none,
boost::none);
const auto shardId = cm.dbPrimary();
@@ -856,10 +857,14 @@ Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionConte
BSONObjBuilder* out) {
auto opCtx = expCtx->opCtx;
+ boost::optional<int> overrideBatchSize;
if (forPerShardCursor) {
tassert(6273804,
"Per shard cursors are supposed to pass fromMongos: false to shards",
!expCtx->inMongos);
+ // By using an initial batchSize of zero all of the events will get returned through
+ // the getMore path and have metadata stripped out.
+ overrideBatchSize = 0;
}
// Format the command for the shard. This wraps the command as an explain if necessary, and
@@ -869,7 +874,8 @@ Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionConte
explain,
nullptr, /* pipeline */
BSONObj(),
- boost::none);
+ boost::none,
+ overrideBatchSize);
if (!forPerShardCursor && shardId != ShardId::kConfigServerId) {
cmdObj = appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED());