diff options
author | Yoonsoo Kim <yoonsoo.kim@mongodb.com> | 2021-10-19 20:46:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-19 21:52:19 +0000 |
commit | 92648d2ee790daa639b0010075a9df5f2cf57dfa (patch) | |
tree | 116dbb0f35f3360c69b670b6f4d3bde5f809e36c | |
parent | 93fc544da4f7752a5ce2ab2166e94c50ea70221f (diff) | |
download | mongo-92648d2ee790daa639b0010075a9df5f2cf57dfa.tar.gz |
SERVER-59070 Support `needsMerge` behavior in $group pushed down to SBE
-rw-r--r-- | jstests/noPassthrough/agg_group.js | 88 | ||||
-rw-r--r-- | jstests/noPassthrough/profile_operation_metrics.js | 6 | ||||
-rw-r--r-- | jstests/noPassthroughWithMongod/group_pushdown.js | 63 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/expressions/expression.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/vm/vm.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/vm/vm.h | 15 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_accumulator.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_helpers.h | 9 |
11 files changed, 240 insertions, 25 deletions
diff --git a/jstests/noPassthrough/agg_group.js b/jstests/noPassthrough/agg_group.js new file mode 100644 index 00000000000..008dfacc7bf --- /dev/null +++ b/jstests/noPassthrough/agg_group.js @@ -0,0 +1,88 @@ +// Tests that $group pushdown to SBE feature works in a sharded environment for some special +// scenarios. +// +// Notes: +// - In a sharded environment, the mongos splits a $group stage into two different stages. One is a +// merge $group stage at the mongos-side which does the global aggregation and the other is a $group +// stage at the shard-side which does the partial aggregation. +// - All aggregation features are tested by aggregation test suites under a sharded environment +// through passthrough tests. So, this test suite focuses on some special scenarios like for +// example, $group is pushed down to SBE at the shard-side and some accumulators may return the +// partial aggregation results in a special format to the mongos. +// +// Needs the following tag to be excluded from linux-64-duroff build variant because running +// wiredTiger without journaling in a replica set is not supported. +// @tags: [requires_sharding] +(function() { +'use strict'; + +load("jstests/libs/analyze_plan.js"); + +// As of now, $group pushdown to SBE feature is not enabled by default. So, enables it with a +// minimal configuration of a sharded cluster. +// +// TODO Remove {setParameter: "featureFlagSBEGroupPushdown=true"} when the feature is enabled by +// default. +const st = new ShardingTest( + {config: 1, shards: 1, shardOptions: {setParameter: "featureFlagSBEGroupPushdown=true"}}); + +// This database name can provide multiple similar test cases with a good separate namespace and +// each test case may create a separate collection for its own dataset. +const db = st.getDB(jsTestName()); +const dbAtShard = st.shard0.getDB(jsTestName()); + +// Makes sure that $group pushdown to SBE feature is enabled. +assert( + assert.commandWorked(dbAtShard.adminCommand({getParameter: 1, featureFlagSBEGroupPushdown: 1})) + .featureFlagSBEGroupPushdown.value); + +// Makes sure that the test db is sharded and the data is stored into the only shard. +assert.commandWorked(st.s0.adminCommand({enableSharding: db.getName()})); +st.ensurePrimaryShard(db.getName(), st.shard0.shardName); + +// A test case for a sharded $sum: Verifies that $group with $sum pushed down to SBE works in a +// sharded environment. + +let coll = db.partial_sum; + +// Makes sure that the collection is sharded. +assert.commandWorked(st.s0.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}})); + +// Prepares data for the 'NumberLong' sum result to overflow, when the shard sends back the partial +// sum as a doc with 'subTotal' and 'subTotalError' fields to the mongos. All data go to the only +// shard and so overflow will happen. +assert.commandWorked( + coll.insert([{a: 1, b: NumberLong("9223372036854775807")}, {a: 2, b: NumberLong("10")}])); + +// Turns to the classic engine at the shard before figuring out its result. +assert.commandWorked( + dbAtShard.adminCommand({setParameter: 1, internalQueryForceClassicEngine: true})); + +// Collects the classic engine's result as the expected result, executing the pipeline at the +// mongos. +const pipeline1 = [{$group: {_id: "$a", s: {$sum: "$b"}}}]; +const classicalRes1 = + coll.runCommand({aggregate: coll.getName(), pipeline: pipeline1, cursor: {}}).cursor.firstBatch; + +// Collects the classic engine's result as the expected result, executing the pipeline at the +// mongos. +const pipeline2 = [{$group: {_id: null, s: {$sum: "$b"}}}]; +const classicalRes2 = + coll.runCommand({aggregate: coll.getName(), pipeline: pipeline2, cursor: {}}).cursor.firstBatch; + +// Turns to the SBE engine at the shard. +assert.commandWorked( + dbAtShard.adminCommand({setParameter: 1, internalQueryForceClassicEngine: false})); + +// Verifies that the SBE engine's results are same as the expected results, executing the pipeline +// at the mongos. +const sbeRes1 = + coll.runCommand({aggregate: coll.getName(), pipeline: pipeline1, cursor: {}}).cursor.firstBatch; +assert.sameMembers(sbeRes1, classicalRes1); + +const sbeRes2 = + coll.runCommand({aggregate: coll.getName(), pipeline: pipeline2, cursor: {}}).cursor.firstBatch; +assert.sameMembers(sbeRes2, classicalRes2); + +st.stop(); +}()); diff --git a/jstests/noPassthrough/profile_operation_metrics.js b/jstests/noPassthrough/profile_operation_metrics.js index 871606206c6..0a7c80d88ae 100644 --- a/jstests/noPassthrough/profile_operation_metrics.js +++ b/jstests/noPassthrough/profile_operation_metrics.js @@ -20,6 +20,10 @@ const isLinux = getBuildInfo().buildEnvironment.target_os == "linux"; const isDebugBuild = (db) => { return db.adminCommand('buildInfo').debug; }; +const isGroupPushdownEnabled = (db) => { + return assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagSBEGroupPushdown: 1})) + .featureFlagSBEGroupPushdown.value; +}; const assertMetricsExist = (profilerEntry) => { let metrics = profilerEntry.operationMetrics; @@ -1056,7 +1060,7 @@ const operations = [ assert.eq(profileDoc.idxEntryBytesWritten, 0); assert.eq(profileDoc.idxEntryUnitsWritten, 0); assert.eq(profileDoc.totalUnitsWritten, 0); - if (isDebugBuild(db)) { + if (isDebugBuild(db) && !isGroupPushdownEnabled(db)) { // In debug builds we sort and spill for each of the first 20 documents. Once we // reach that limit, we stop spilling as often. This 26 is the sum of 20 debug sorts // and spills of documents in groups 0 through 3 plus 6 debug spills and sorts for diff --git a/jstests/noPassthroughWithMongod/group_pushdown.js b/jstests/noPassthroughWithMongod/group_pushdown.js index 15c726a260f..b19245e596c 100644 --- a/jstests/noPassthroughWithMongod/group_pushdown.js +++ b/jstests/noPassthroughWithMongod/group_pushdown.js @@ -201,10 +201,7 @@ assert(explain.stages[1].hasOwnProperty("$group")); // merge $group stage at the mongos-side which does the global aggregation and the other is a $group // stage at the shard-side which does the partial aggregation. The shard-side $group stage is // requested with 'needsMerge' and 'fromMongos' flags set to true from the mongos, which we should -// block from being pushed down to SBE until we implement 'needsMerge' behavior for each -// accumulator. -// -// TODO SERVER-59070 Remove the following test case after implementing 'needsMerge' behavior. +// verify that is also pushed down and produces the correct results. explain = coll.runCommand({ aggregate: coll.getName(), explain: true, @@ -213,6 +210,60 @@ explain = coll.runCommand({ fromMongos: true, cursor: {} }); -assert.eq(null, getAggPlanStage(explain, "GROUP"), explain); -assert(explain.stages[1].hasOwnProperty("$group")); +assert.neq(null, getAggPlanStage(explain, "GROUP"), explain); + +const originalClassicEngineStatus = + assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryForceClassicEngine: true})) + .was; + +const pipeline1 = [{$group: {_id: "$item", s: {$sum: "$quantity"}}}]; +const classicalRes1 = coll.runCommand({ + aggregate: coll.getName(), + pipeline: pipeline1, + needsMerge: true, + fromMongos: true, + cursor: {} + }) + .cursor.firstBatch; + +// When there's overflow for 'NumberLong', the mongod sends back the partial sum as a doc with +// 'subTotal' and 'subTotalError' fields. So, we need an overflow case to verify such behavior. +const tcoll = db.group_pushdown1; +assert.commandWorked(tcoll.insert([{a: NumberLong("9223372036854775807")}, {a: NumberLong("10")}])); +const pipeline2 = [{$group: {_id: null, s: {$sum: "$a"}}}]; +const classicalRes2 = tcoll + .runCommand({ + aggregate: tcoll.getName(), + pipeline: pipeline2, + needsMerge: true, + fromMongos: true, + cursor: {} + }) + .cursor.firstBatch; + +assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryForceClassicEngine: false})); + +const sbeRes1 = coll.runCommand({ + aggregate: coll.getName(), + pipeline: pipeline1, + needsMerge: true, + fromMongos: true, + cursor: {} + }) + .cursor.firstBatch; +assert.sameMembers(sbeRes1, classicalRes1); + +const sbeRes2 = tcoll + .runCommand({ + aggregate: tcoll.getName(), + pipeline: pipeline2, + needsMerge: true, + fromMongos: true, + cursor: {} + }) + .cursor.firstBatch; +assert.docEq(sbeRes2, classicalRes2); + +assert.commandWorked(db.adminCommand( + {setParameter: 1, internalQueryForceClassicEngine: originalClassicEngineStatus})); })(); diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp index e1dd5d7b205..c78c4d06b85 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.cpp +++ b/src/mongo/db/exec/sbe/expressions/expression.cpp @@ -412,6 +412,8 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = { BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggDoubleDoubleSum, true}}, {"doubleDoubleSumFinalize", BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoubleSumFinalize, false}}, + {"doubleDoubleMergeSumFinalize", + BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoubleMergeSumFinalize, false}}, {"aggStdDev", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggStdDev, true}}, {"stdDevPopFinalize", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::stdDevPopFinalize, false}}, diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp index a848cc7dd09..63dbea82c2b 100644 --- a/src/mongo/db/exec/sbe/vm/vm.cpp +++ b/src/mongo/db/exec/sbe/vm/vm.cpp @@ -966,6 +966,9 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggDoubleDouble // This function is necessary because 'aggDoubleDoubleSum()' result is 'Array' type but we need // to produce a scalar value out of it. +// +// 'keepIntegerPrecision' should be set to true when we want to keep precision for integral values. +template <bool keepIntegerPrecision> std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinDoubleDoubleSumFinalize( ArityType arity) { auto [_, fieldTag, fieldValue] = getFromStack(0); @@ -1008,6 +1011,25 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinDoubleDoubleSum value::bitcastFrom<int64_t>(longVal)}; } } + + if constexpr (keepIntegerPrecision) { + // The value was too large for a NumberInt64, so output an array with two + // values adding up to the desired total. The mongos computes the final sum, + // considering errors. + auto [total, error] = nonDecimalTotal.getDoubleDouble(); + auto llerror = static_cast<int64_t>(error); + auto [tag, val] = value::makeNewArray(); + value::ValueGuard guard(tag, val); + auto arr = value::getArrayView(val); + arr->reserve(static_cast<size_t>(AggPartialSumElems::kSizeOfArray)); + arr->push_back(value::TypeTags::NumberDouble, + value::bitcastFrom<double>(total)); + arr->push_back(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(llerror)); + guard.reset(); + return {true, tag, val}; + } + // Sum doesn't fit a NumberLong, so return a NumberDouble instead. [[fallthrough]]; case value::TypeTags::NumberDouble: @@ -3821,7 +3843,11 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builti case Builtin::aggDoubleDoubleSum: return builtinAggDoubleDoubleSum(arity); case Builtin::doubleDoubleSumFinalize: - return builtinDoubleDoubleSumFinalize(arity); + return builtinDoubleDoubleSumFinalize<>(arity); + case Builtin::doubleDoubleMergeSumFinalize: + // This is for sharding support of aggregations that use 'doubleDoubleSum' algorithm. + // We should keep precision for integral values when the partial sum is to be merged. + return builtinDoubleDoubleSumFinalize<true /*keepIntegerPrecision*/>(arity); case Builtin::aggStdDev: return builtinAggStdDev(arity); case Builtin::stdDevPopFinalize: diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h index e8e378ffec9..20aab1fdb41 100644 --- a/src/mongo/db/exec/sbe/vm/vm.h +++ b/src/mongo/db/exec/sbe/vm/vm.h @@ -362,6 +362,7 @@ enum class Builtin : uint8_t { doubleDoubleSum, // special double summation aggDoubleDoubleSum, doubleDoubleSumFinalize, + doubleDoubleMergeSumFinalize, aggStdDev, stdDevPopFinalize, stdDevSampFinalize, @@ -433,8 +434,8 @@ enum class Builtin : uint8_t { * - The element at index `kDecimalTotal` is optional and represents the sum of all decimal values * if any such values are encountered. * - * See 'aggDoubleDoubleSumImpl()'/'aggDoubleDoubleSum()'/'doubleDoubleSumFinalize()' for more - * details. + * See 'builtinAggDoubleDoubleSumImpl()' / 'builtInAggDoubleDoubleSum()' / + * 'builtinDoubleDoubleSumFinalize()' for more details. */ enum AggSumValueElems { kNonDecimalTotalTag, @@ -446,6 +447,14 @@ enum AggSumValueElems { }; /** + * This enum defines indices into an 'Array' that returns the partial sum result when 'needsMerge' + * is requested. + * + * See 'builtinDoubleDoubleSumFinalize()' for more details. + */ +enum class AggPartialSumElems { kTotal, kError, kSizeOfArray }; + +/** * This enum defines indices into an 'Array' that accumulates $stdDevPop and $stdDevSamp results. * * The array contains 3 elements: @@ -956,6 +965,8 @@ private: std::tuple<bool, value::TypeTags, value::Value> builtinCollAddToSet(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoubleSum(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinAggDoubleDoubleSum(ArityType arity); + // This is only for compatibility with mongos/sharding and we will revisit this later. + template <bool keepIntegerPrecision = false> std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoubleSumFinalize(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinAggStdDev(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinStdDevPopFinalize(ArityType arity); diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 99f3aabc6a7..73f3a76d81d 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -489,12 +489,7 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson( BSONObj groupObj(elem.Obj()); BSONObjIterator groupIterator(groupObj); VariablesParseState vps = expCtx->variablesParseState; - // The 'needsMerge' behavior is not implemented for any accumulators and so $group can't be - // pushed down to SBE when 'needsMerge' behavior is requested from the mongos. - // - // TODO SERVER-59070 Set 'sbeGroupCompatible' to true after implementing 'needsMerge' behavior - // for all accumulators. - expCtx->sbeGroupCompatible = !expCtx->needsMerge; + expCtx->sbeGroupCompatible = true; while (groupIterator.more()) { BSONElement groupField(groupIterator.next()); StringData pFieldName = groupField.fieldNameStringData(); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index dfc8d2abedb..ab2b3d1cc76 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -150,7 +150,6 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGr auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get()); if (!(groupStage && groupStage->sbeCompatible()) || groupStage->doingMerge()) { // Only pushdown a prefix of group stages that are supported by sbe. - // TODO: SERVER-59070 remove the 'doingMerge' check when we support merging. break; } groupsForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage)); diff --git a/src/mongo/db/query/sbe_stage_builder.cpp b/src/mongo/db/query/sbe_stage_builder.cpp index c6b55ad15f0..aff9af466cc 100644 --- a/src/mongo/db/query/sbe_stage_builder.cpp +++ b/src/mongo/db/query/sbe_stage_builder.cpp @@ -652,7 +652,8 @@ SlotBasedStageBuilder::SlotBasedStageBuilder(OperationContext* opCtx, _cq.getExpCtxRaw()->variables, &_slotIdGenerator, &_frameIdGenerator, - &_spoolIdGenerator) { + &_spoolIdGenerator, + _cq.getExpCtx()->needsMerge) { // SERVER-52803: In the future if we need to gather more information from the QuerySolutionNode // tree, rather than doing one-off scans for each piece of information, we should add a formal // analysis pass here. diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp index d968fc13b0f..4792f38a4e4 100644 --- a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp +++ b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp @@ -212,11 +212,44 @@ std::pair<std::unique_ptr<sbe::EExpression>, EvalStage> buildFinalizeSum( str::stream() << "Expected one input slot for finalization of sum, got: " << sumSlots.size(), sumSlots.size() == 1); - auto sumFinalize = - sbe::makeE<sbe::EIf>(generateNullOrMissing(makeVariable(sumSlots[0])), - makeConstant(sbe::value::TypeTags::NumberInt32, 0), - makeFunction("doubleDoubleSumFinalize", makeVariable(sumSlots[0]))); - return {std::move(sumFinalize), std::move(inputStage)}; + + if (state.needsMerge) { + // When to support the sharding behavior, the mongos splits $group into two separate $group + // stages one at the mongo-side side and the other at the shard-side, the shard-side $sum + // accumulator is responsible to return the partial sum which is mostly same format to the + // global sum but in the cases of overflowed 'NumberInt32'/'NumberInt64', return a + // sub-document {subTotal: val1, subTotalError: val2}. The builtin function for $sum + // ('builtinDoubleDoubleSumFinalize()') returns an 'Array' when there's an overflow. So, + // only when the return value is 'Array'-typed, we compose the sub-document. + auto sumFinalize = makeFunction("doubleDoubleMergeSumFinalize", makeVariable(sumSlots[0])); + + auto partialSumFinalize = makeLocalBind( + state.frameIdGenerator, + [](sbe::EVariable input) { + return sbe::makeE<sbe::EIf>( + makeFunction("isArray", input.clone()), + makeFunction( + "newObj", + makeConstant("subTotal"_sd), + makeFunction( + "getElement", + input.clone(), + makeConstant(sbe::value::TypeTags::NumberInt32, + static_cast<int>(sbe::vm::AggPartialSumElems::kTotal))), + makeConstant("subTotalError"_sd), + makeFunction( + "getElement", + input.clone(), + makeConstant(sbe::value::TypeTags::NumberInt32, + static_cast<int>(sbe::vm::AggPartialSumElems::kError)))), + input.clone()); + }, + std::move(sumFinalize)); + return {std::move(partialSumFinalize), std::move(inputStage)}; + } else { + auto sumFinalize = makeFunction("doubleDoubleSumFinalize", makeVariable(sumSlots[0])); + return {std::move(sumFinalize), std::move(inputStage)}; + } } std::pair<std::vector<std::unique_ptr<sbe::EExpression>>, EvalStage> buildAccumulatorAddToSet( diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.h b/src/mongo/db/query/sbe_stage_builder_helpers.h index 46f41471ac0..e9b638c3b0c 100644 --- a/src/mongo/db/query/sbe_stage_builder_helpers.h +++ b/src/mongo/db/query/sbe_stage_builder_helpers.h @@ -805,13 +805,15 @@ struct StageBuilderState { const Variables& variables, sbe::value::SlotIdGenerator* slotIdGenerator, sbe::value::FrameIdGenerator* frameIdGenerator, - sbe::value::SpoolIdGenerator* spoolIdGenerator) + sbe::value::SpoolIdGenerator* spoolIdGenerator, + bool needsMerge) : slotIdGenerator{slotIdGenerator}, frameIdGenerator{frameIdGenerator}, spoolIdGenerator{spoolIdGenerator}, opCtx{opCtx}, env{env}, - variables{variables} {} + variables{variables}, + needsMerge{needsMerge} {} StageBuilderState(const StageBuilderState& other) = delete; @@ -838,6 +840,9 @@ struct StageBuilderState { const Variables& variables; stdx::unordered_map<Variables::Id, sbe::value::SlotId> globalVariables; + // When the mongos splits $group stage and sends it to shards, it adds 'needsMerge'/'fromMongs' + // flags to true so that shards can sends special partial aggregation results to the mongos. + bool needsMerge; }; } // namespace mongo::stage_builder |