summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYoonsoo Kim <yoonsoo.kim@mongodb.com>2021-10-19 20:46:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-19 21:52:19 +0000
commit92648d2ee790daa639b0010075a9df5f2cf57dfa (patch)
tree116dbb0f35f3360c69b670b6f4d3bde5f809e36c
parent93fc544da4f7752a5ce2ab2166e94c50ea70221f (diff)
downloadmongo-92648d2ee790daa639b0010075a9df5f2cf57dfa.tar.gz
SERVER-59070 Support `needsMerge` behavior in $group pushed down to SBE
-rw-r--r--jstests/noPassthrough/agg_group.js88
-rw-r--r--jstests/noPassthrough/profile_operation_metrics.js6
-rw-r--r--jstests/noPassthroughWithMongod/group_pushdown.js63
-rw-r--r--src/mongo/db/exec/sbe/expressions/expression.cpp2
-rw-r--r--src/mongo/db/exec/sbe/vm/vm.cpp28
-rw-r--r--src/mongo/db/exec/sbe/vm/vm.h15
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp7
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp1
-rw-r--r--src/mongo/db/query/sbe_stage_builder.cpp3
-rw-r--r--src/mongo/db/query/sbe_stage_builder_accumulator.cpp43
-rw-r--r--src/mongo/db/query/sbe_stage_builder_helpers.h9
11 files changed, 240 insertions, 25 deletions
diff --git a/jstests/noPassthrough/agg_group.js b/jstests/noPassthrough/agg_group.js
new file mode 100644
index 00000000000..008dfacc7bf
--- /dev/null
+++ b/jstests/noPassthrough/agg_group.js
@@ -0,0 +1,88 @@
+// Tests that $group pushdown to SBE feature works in a sharded environment for some special
+// scenarios.
+//
+// Notes:
+// - In a sharded environment, the mongos splits a $group stage into two different stages. One is a
+// merge $group stage at the mongos-side which does the global aggregation and the other is a $group
+// stage at the shard-side which does the partial aggregation.
+// - All aggregation features are tested by aggregation test suites under a sharded environment
+// through passthrough tests. So, this test suite focuses on some special scenarios like for
+// example, $group is pushed down to SBE at the shard-side and some accumulators may return the
+// partial aggregation results in a special format to the mongos.
+//
+// Needs the following tag to be excluded from linux-64-duroff build variant because running
+// wiredTiger without journaling in a replica set is not supported.
+// @tags: [requires_sharding]
+(function() {
+'use strict';
+
+load("jstests/libs/analyze_plan.js");
+
+// As of now, $group pushdown to SBE feature is not enabled by default. So, enables it with a
+// minimal configuration of a sharded cluster.
+//
+// TODO Remove {setParameter: "featureFlagSBEGroupPushdown=true"} when the feature is enabled by
+// default.
+const st = new ShardingTest(
+ {config: 1, shards: 1, shardOptions: {setParameter: "featureFlagSBEGroupPushdown=true"}});
+
+// This database name can provide multiple similar test cases with a good separate namespace and
+// each test case may create a separate collection for its own dataset.
+const db = st.getDB(jsTestName());
+const dbAtShard = st.shard0.getDB(jsTestName());
+
+// Makes sure that $group pushdown to SBE feature is enabled.
+assert(
+ assert.commandWorked(dbAtShard.adminCommand({getParameter: 1, featureFlagSBEGroupPushdown: 1}))
+ .featureFlagSBEGroupPushdown.value);
+
+// Makes sure that the test db is sharded and the data is stored into the only shard.
+assert.commandWorked(st.s0.adminCommand({enableSharding: db.getName()}));
+st.ensurePrimaryShard(db.getName(), st.shard0.shardName);
+
+// A test case for a sharded $sum: Verifies that $group with $sum pushed down to SBE works in a
+// sharded environment.
+
+let coll = db.partial_sum;
+
+// Makes sure that the collection is sharded.
+assert.commandWorked(st.s0.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}}));
+
+// Prepares data for the 'NumberLong' sum result to overflow, when the shard sends back the partial
+// sum as a doc with 'subTotal' and 'subTotalError' fields to the mongos. All data go to the only
+// shard and so overflow will happen.
+assert.commandWorked(
+ coll.insert([{a: 1, b: NumberLong("9223372036854775807")}, {a: 2, b: NumberLong("10")}]));
+
+// Turns to the classic engine at the shard before figuring out its result.
+assert.commandWorked(
+ dbAtShard.adminCommand({setParameter: 1, internalQueryForceClassicEngine: true}));
+
+// Collects the classic engine's result as the expected result, executing the pipeline at the
+// mongos.
+const pipeline1 = [{$group: {_id: "$a", s: {$sum: "$b"}}}];
+const classicalRes1 =
+ coll.runCommand({aggregate: coll.getName(), pipeline: pipeline1, cursor: {}}).cursor.firstBatch;
+
+// Collects the classic engine's result as the expected result, executing the pipeline at the
+// mongos.
+const pipeline2 = [{$group: {_id: null, s: {$sum: "$b"}}}];
+const classicalRes2 =
+ coll.runCommand({aggregate: coll.getName(), pipeline: pipeline2, cursor: {}}).cursor.firstBatch;
+
+// Turns to the SBE engine at the shard.
+assert.commandWorked(
+ dbAtShard.adminCommand({setParameter: 1, internalQueryForceClassicEngine: false}));
+
+// Verifies that the SBE engine's results are same as the expected results, executing the pipeline
+// at the mongos.
+const sbeRes1 =
+ coll.runCommand({aggregate: coll.getName(), pipeline: pipeline1, cursor: {}}).cursor.firstBatch;
+assert.sameMembers(sbeRes1, classicalRes1);
+
+const sbeRes2 =
+ coll.runCommand({aggregate: coll.getName(), pipeline: pipeline2, cursor: {}}).cursor.firstBatch;
+assert.sameMembers(sbeRes2, classicalRes2);
+
+st.stop();
+}());
diff --git a/jstests/noPassthrough/profile_operation_metrics.js b/jstests/noPassthrough/profile_operation_metrics.js
index 871606206c6..0a7c80d88ae 100644
--- a/jstests/noPassthrough/profile_operation_metrics.js
+++ b/jstests/noPassthrough/profile_operation_metrics.js
@@ -20,6 +20,10 @@ const isLinux = getBuildInfo().buildEnvironment.target_os == "linux";
const isDebugBuild = (db) => {
return db.adminCommand('buildInfo').debug;
};
+const isGroupPushdownEnabled = (db) => {
+ return assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagSBEGroupPushdown: 1}))
+ .featureFlagSBEGroupPushdown.value;
+};
const assertMetricsExist = (profilerEntry) => {
let metrics = profilerEntry.operationMetrics;
@@ -1056,7 +1060,7 @@ const operations = [
assert.eq(profileDoc.idxEntryBytesWritten, 0);
assert.eq(profileDoc.idxEntryUnitsWritten, 0);
assert.eq(profileDoc.totalUnitsWritten, 0);
- if (isDebugBuild(db)) {
+ if (isDebugBuild(db) && !isGroupPushdownEnabled(db)) {
// In debug builds we sort and spill for each of the first 20 documents. Once we
// reach that limit, we stop spilling as often. This 26 is the sum of 20 debug sorts
// and spills of documents in groups 0 through 3 plus 6 debug spills and sorts for
diff --git a/jstests/noPassthroughWithMongod/group_pushdown.js b/jstests/noPassthroughWithMongod/group_pushdown.js
index 15c726a260f..b19245e596c 100644
--- a/jstests/noPassthroughWithMongod/group_pushdown.js
+++ b/jstests/noPassthroughWithMongod/group_pushdown.js
@@ -201,10 +201,7 @@ assert(explain.stages[1].hasOwnProperty("$group"));
// merge $group stage at the mongos-side which does the global aggregation and the other is a $group
// stage at the shard-side which does the partial aggregation. The shard-side $group stage is
// requested with 'needsMerge' and 'fromMongos' flags set to true from the mongos, which we should
-// block from being pushed down to SBE until we implement 'needsMerge' behavior for each
-// accumulator.
-//
-// TODO SERVER-59070 Remove the following test case after implementing 'needsMerge' behavior.
+// verify that is also pushed down and produces the correct results.
explain = coll.runCommand({
aggregate: coll.getName(),
explain: true,
@@ -213,6 +210,60 @@ explain = coll.runCommand({
fromMongos: true,
cursor: {}
});
-assert.eq(null, getAggPlanStage(explain, "GROUP"), explain);
-assert(explain.stages[1].hasOwnProperty("$group"));
+assert.neq(null, getAggPlanStage(explain, "GROUP"), explain);
+
+const originalClassicEngineStatus =
+ assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryForceClassicEngine: true}))
+ .was;
+
+const pipeline1 = [{$group: {_id: "$item", s: {$sum: "$quantity"}}}];
+const classicalRes1 = coll.runCommand({
+ aggregate: coll.getName(),
+ pipeline: pipeline1,
+ needsMerge: true,
+ fromMongos: true,
+ cursor: {}
+ })
+ .cursor.firstBatch;
+
+// When there's overflow for 'NumberLong', the mongod sends back the partial sum as a doc with
+// 'subTotal' and 'subTotalError' fields. So, we need an overflow case to verify such behavior.
+const tcoll = db.group_pushdown1;
+assert.commandWorked(tcoll.insert([{a: NumberLong("9223372036854775807")}, {a: NumberLong("10")}]));
+const pipeline2 = [{$group: {_id: null, s: {$sum: "$a"}}}];
+const classicalRes2 = tcoll
+ .runCommand({
+ aggregate: tcoll.getName(),
+ pipeline: pipeline2,
+ needsMerge: true,
+ fromMongos: true,
+ cursor: {}
+ })
+ .cursor.firstBatch;
+
+assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryForceClassicEngine: false}));
+
+const sbeRes1 = coll.runCommand({
+ aggregate: coll.getName(),
+ pipeline: pipeline1,
+ needsMerge: true,
+ fromMongos: true,
+ cursor: {}
+ })
+ .cursor.firstBatch;
+assert.sameMembers(sbeRes1, classicalRes1);
+
+const sbeRes2 = tcoll
+ .runCommand({
+ aggregate: tcoll.getName(),
+ pipeline: pipeline2,
+ needsMerge: true,
+ fromMongos: true,
+ cursor: {}
+ })
+ .cursor.firstBatch;
+assert.docEq(sbeRes2, classicalRes2);
+
+assert.commandWorked(db.adminCommand(
+ {setParameter: 1, internalQueryForceClassicEngine: originalClassicEngineStatus}));
})();
diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp
index e1dd5d7b205..c78c4d06b85 100644
--- a/src/mongo/db/exec/sbe/expressions/expression.cpp
+++ b/src/mongo/db/exec/sbe/expressions/expression.cpp
@@ -412,6 +412,8 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = {
BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggDoubleDoubleSum, true}},
{"doubleDoubleSumFinalize",
BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoubleSumFinalize, false}},
+ {"doubleDoubleMergeSumFinalize",
+ BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoubleMergeSumFinalize, false}},
{"aggStdDev", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggStdDev, true}},
{"stdDevPopFinalize",
BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::stdDevPopFinalize, false}},
diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp
index a848cc7dd09..63dbea82c2b 100644
--- a/src/mongo/db/exec/sbe/vm/vm.cpp
+++ b/src/mongo/db/exec/sbe/vm/vm.cpp
@@ -966,6 +966,9 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggDoubleDouble
// This function is necessary because 'aggDoubleDoubleSum()' result is 'Array' type but we need
// to produce a scalar value out of it.
+//
+// 'keepIntegerPrecision' should be set to true when we want to keep precision for integral values.
+template <bool keepIntegerPrecision>
std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinDoubleDoubleSumFinalize(
ArityType arity) {
auto [_, fieldTag, fieldValue] = getFromStack(0);
@@ -1008,6 +1011,25 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinDoubleDoubleSum
value::bitcastFrom<int64_t>(longVal)};
}
}
+
+ if constexpr (keepIntegerPrecision) {
+ // The value was too large for a NumberInt64, so output an array with two
+ // values adding up to the desired total. The mongos computes the final sum,
+ // considering errors.
+ auto [total, error] = nonDecimalTotal.getDoubleDouble();
+ auto llerror = static_cast<int64_t>(error);
+ auto [tag, val] = value::makeNewArray();
+ value::ValueGuard guard(tag, val);
+ auto arr = value::getArrayView(val);
+ arr->reserve(static_cast<size_t>(AggPartialSumElems::kSizeOfArray));
+ arr->push_back(value::TypeTags::NumberDouble,
+ value::bitcastFrom<double>(total));
+ arr->push_back(value::TypeTags::NumberInt64,
+ value::bitcastFrom<int64_t>(llerror));
+ guard.reset();
+ return {true, tag, val};
+ }
+
// Sum doesn't fit a NumberLong, so return a NumberDouble instead.
[[fallthrough]];
case value::TypeTags::NumberDouble:
@@ -3821,7 +3843,11 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builti
case Builtin::aggDoubleDoubleSum:
return builtinAggDoubleDoubleSum(arity);
case Builtin::doubleDoubleSumFinalize:
- return builtinDoubleDoubleSumFinalize(arity);
+ return builtinDoubleDoubleSumFinalize<>(arity);
+ case Builtin::doubleDoubleMergeSumFinalize:
+ // This is for sharding support of aggregations that use 'doubleDoubleSum' algorithm.
+ // We should keep precision for integral values when the partial sum is to be merged.
+ return builtinDoubleDoubleSumFinalize<true /*keepIntegerPrecision*/>(arity);
case Builtin::aggStdDev:
return builtinAggStdDev(arity);
case Builtin::stdDevPopFinalize:
diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h
index e8e378ffec9..20aab1fdb41 100644
--- a/src/mongo/db/exec/sbe/vm/vm.h
+++ b/src/mongo/db/exec/sbe/vm/vm.h
@@ -362,6 +362,7 @@ enum class Builtin : uint8_t {
doubleDoubleSum, // special double summation
aggDoubleDoubleSum,
doubleDoubleSumFinalize,
+ doubleDoubleMergeSumFinalize,
aggStdDev,
stdDevPopFinalize,
stdDevSampFinalize,
@@ -433,8 +434,8 @@ enum class Builtin : uint8_t {
* - The element at index `kDecimalTotal` is optional and represents the sum of all decimal values
* if any such values are encountered.
*
- * See 'aggDoubleDoubleSumImpl()'/'aggDoubleDoubleSum()'/'doubleDoubleSumFinalize()' for more
- * details.
+ * See 'builtinAggDoubleDoubleSumImpl()' / 'builtInAggDoubleDoubleSum()' /
+ * 'builtinDoubleDoubleSumFinalize()' for more details.
*/
enum AggSumValueElems {
kNonDecimalTotalTag,
@@ -446,6 +447,14 @@ enum AggSumValueElems {
};
/**
+ * This enum defines indices into an 'Array' that returns the partial sum result when 'needsMerge'
+ * is requested.
+ *
+ * See 'builtinDoubleDoubleSumFinalize()' for more details.
+ */
+enum class AggPartialSumElems { kTotal, kError, kSizeOfArray };
+
+/**
* This enum defines indices into an 'Array' that accumulates $stdDevPop and $stdDevSamp results.
*
* The array contains 3 elements:
@@ -956,6 +965,8 @@ private:
std::tuple<bool, value::TypeTags, value::Value> builtinCollAddToSet(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoubleSum(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinAggDoubleDoubleSum(ArityType arity);
+ // This is only for compatibility with mongos/sharding and we will revisit this later.
+ template <bool keepIntegerPrecision = false>
std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoubleSumFinalize(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinAggStdDev(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinStdDevPopFinalize(ArityType arity);
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 99f3aabc6a7..73f3a76d81d 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -489,12 +489,7 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson(
BSONObj groupObj(elem.Obj());
BSONObjIterator groupIterator(groupObj);
VariablesParseState vps = expCtx->variablesParseState;
- // The 'needsMerge' behavior is not implemented for any accumulators and so $group can't be
- // pushed down to SBE when 'needsMerge' behavior is requested from the mongos.
- //
- // TODO SERVER-59070 Set 'sbeGroupCompatible' to true after implementing 'needsMerge' behavior
- // for all accumulators.
- expCtx->sbeGroupCompatible = !expCtx->needsMerge;
+ expCtx->sbeGroupCompatible = true;
while (groupIterator.more()) {
BSONElement groupField(groupIterator.next());
StringData pFieldName = groupField.fieldNameStringData();
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index dfc8d2abedb..ab2b3d1cc76 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -150,7 +150,6 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGr
auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get());
if (!(groupStage && groupStage->sbeCompatible()) || groupStage->doingMerge()) {
// Only pushdown a prefix of group stages that are supported by sbe.
- // TODO: SERVER-59070 remove the 'doingMerge' check when we support merging.
break;
}
groupsForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage));
diff --git a/src/mongo/db/query/sbe_stage_builder.cpp b/src/mongo/db/query/sbe_stage_builder.cpp
index c6b55ad15f0..aff9af466cc 100644
--- a/src/mongo/db/query/sbe_stage_builder.cpp
+++ b/src/mongo/db/query/sbe_stage_builder.cpp
@@ -652,7 +652,8 @@ SlotBasedStageBuilder::SlotBasedStageBuilder(OperationContext* opCtx,
_cq.getExpCtxRaw()->variables,
&_slotIdGenerator,
&_frameIdGenerator,
- &_spoolIdGenerator) {
+ &_spoolIdGenerator,
+ _cq.getExpCtx()->needsMerge) {
// SERVER-52803: In the future if we need to gather more information from the QuerySolutionNode
// tree, rather than doing one-off scans for each piece of information, we should add a formal
// analysis pass here.
diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp
index d968fc13b0f..4792f38a4e4 100644
--- a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp
@@ -212,11 +212,44 @@ std::pair<std::unique_ptr<sbe::EExpression>, EvalStage> buildFinalizeSum(
str::stream() << "Expected one input slot for finalization of sum, got: "
<< sumSlots.size(),
sumSlots.size() == 1);
- auto sumFinalize =
- sbe::makeE<sbe::EIf>(generateNullOrMissing(makeVariable(sumSlots[0])),
- makeConstant(sbe::value::TypeTags::NumberInt32, 0),
- makeFunction("doubleDoubleSumFinalize", makeVariable(sumSlots[0])));
- return {std::move(sumFinalize), std::move(inputStage)};
+
+ if (state.needsMerge) {
+ // When to support the sharding behavior, the mongos splits $group into two separate $group
+ // stages one at the mongo-side side and the other at the shard-side, the shard-side $sum
+ // accumulator is responsible to return the partial sum which is mostly same format to the
+ // global sum but in the cases of overflowed 'NumberInt32'/'NumberInt64', return a
+ // sub-document {subTotal: val1, subTotalError: val2}. The builtin function for $sum
+ // ('builtinDoubleDoubleSumFinalize()') returns an 'Array' when there's an overflow. So,
+ // only when the return value is 'Array'-typed, we compose the sub-document.
+ auto sumFinalize = makeFunction("doubleDoubleMergeSumFinalize", makeVariable(sumSlots[0]));
+
+ auto partialSumFinalize = makeLocalBind(
+ state.frameIdGenerator,
+ [](sbe::EVariable input) {
+ return sbe::makeE<sbe::EIf>(
+ makeFunction("isArray", input.clone()),
+ makeFunction(
+ "newObj",
+ makeConstant("subTotal"_sd),
+ makeFunction(
+ "getElement",
+ input.clone(),
+ makeConstant(sbe::value::TypeTags::NumberInt32,
+ static_cast<int>(sbe::vm::AggPartialSumElems::kTotal))),
+ makeConstant("subTotalError"_sd),
+ makeFunction(
+ "getElement",
+ input.clone(),
+ makeConstant(sbe::value::TypeTags::NumberInt32,
+ static_cast<int>(sbe::vm::AggPartialSumElems::kError)))),
+ input.clone());
+ },
+ std::move(sumFinalize));
+ return {std::move(partialSumFinalize), std::move(inputStage)};
+ } else {
+ auto sumFinalize = makeFunction("doubleDoubleSumFinalize", makeVariable(sumSlots[0]));
+ return {std::move(sumFinalize), std::move(inputStage)};
+ }
}
std::pair<std::vector<std::unique_ptr<sbe::EExpression>>, EvalStage> buildAccumulatorAddToSet(
diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.h b/src/mongo/db/query/sbe_stage_builder_helpers.h
index 46f41471ac0..e9b638c3b0c 100644
--- a/src/mongo/db/query/sbe_stage_builder_helpers.h
+++ b/src/mongo/db/query/sbe_stage_builder_helpers.h
@@ -805,13 +805,15 @@ struct StageBuilderState {
const Variables& variables,
sbe::value::SlotIdGenerator* slotIdGenerator,
sbe::value::FrameIdGenerator* frameIdGenerator,
- sbe::value::SpoolIdGenerator* spoolIdGenerator)
+ sbe::value::SpoolIdGenerator* spoolIdGenerator,
+ bool needsMerge)
: slotIdGenerator{slotIdGenerator},
frameIdGenerator{frameIdGenerator},
spoolIdGenerator{spoolIdGenerator},
opCtx{opCtx},
env{env},
- variables{variables} {}
+ variables{variables},
+ needsMerge{needsMerge} {}
StageBuilderState(const StageBuilderState& other) = delete;
@@ -838,6 +840,9 @@ struct StageBuilderState {
const Variables& variables;
stdx::unordered_map<Variables::Id, sbe::value::SlotId> globalVariables;
+ // When the mongos splits $group stage and sends it to shards, it adds 'needsMerge'/'fromMongs'
+ // flags to true so that shards can sends special partial aggregation results to the mongos.
+ bool needsMerge;
};
} // namespace mongo::stage_builder