diff options
author | Martin Neupauer <martin.neupauer@mongodb.com> | 2021-07-14 21:03:52 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-22 17:22:20 +0000 |
commit | f5afeb9966a9a58f26347c3670bd42f36560eb8a (patch) | |
tree | ff175d6bb64d3fb9b4c3f80e936b94c8e2f2b445 | |
parent | 491301856f66409cbc57106d785153f40316a44a (diff) | |
download | mongo-f5afeb9966a9a58f26347c3670bd42f36560eb8a.tar.gz |
SERVER-58431 Modify HashAgg stage in preparation for $lookup + $group pushdown
Implemented optinal seek keys used to lookup a particular key in the hash table.
-rw-r--r-- | src/mongo/db/exec/sbe/parser/parser.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp | 74 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/hash_agg.cpp | 138 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/hash_agg.h | 19 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_helpers.cpp | 9 |
6 files changed, 208 insertions, 38 deletions
diff --git a/src/mongo/db/exec/sbe/parser/parser.cpp b/src/mongo/db/exec/sbe/parser/parser.cpp index 6fd95a5d720..1679d0b900f 100644 --- a/src/mongo/db/exec/sbe/parser/parser.cpp +++ b/src/mongo/db/exec/sbe/parser/parser.cpp @@ -1129,6 +1129,8 @@ void Parser::walkGroup(AstQuery& ast) { std::move(ast.nodes[inputPos]->stage), lookupSlots(std::move(ast.nodes[0]->identifiers)), lookupSlots(std::move(ast.nodes[1]->projects)), + makeSV(), + true, collatorSlotPos ? lookupSlot(std::move(ast.nodes[collatorSlotPos]->identifier)) : boost::none, getCurrentPlanNodeId()); diff --git a/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp b/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp index 29532022114..e75dced43cc 100644 --- a/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp +++ b/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp @@ -358,6 +358,8 @@ protected: sbe::value::SlotId{3}, stage_builder::makeFunction( "max", sbe::makeE<sbe::EVariable>(sbe::value::SlotId{1}))), + sbe::makeSV(), + true, boost::none, /* optional collator slot */ planNodeId), // GROUP with a collator slot. @@ -370,6 +372,8 @@ protected: sbe::value::SlotId{3}, stage_builder::makeFunction( "max", sbe::makeE<sbe::EVariable>(sbe::value::SlotId{1}))), + sbe::makeSV(), + true, sbe::value::SlotId{4}, /* optional collator slot */ planNodeId), // LIMIT diff --git a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp index f8972a44c87..35467329b18 100644 --- a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp @@ -81,6 +81,8 @@ TEST_F(HashAggStageTest, HashAggMinMaxTest) { collMaxSlot, stage_builder::makeFunction( "collMax", collExpr->clone(), makeE<EVariable>(scanSlot))), + makeSV(), + true, boost::none, kEmptyPlanNodeId); @@ -135,6 +137,8 @@ TEST_F(HashAggStageTest, HashAggAddToSetTest) { makeEM(hashAggSlot, stage_builder::makeFunction( "collAddToSet", std::move(collExpr), makeE<EVariable>(scanSlot))), + makeSV(), + true, boost::none, kEmptyPlanNodeId); @@ -219,6 +223,8 @@ TEST_F(HashAggStageTest, HashAggCollationTest) { "sum", makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSV(), + true, boost::optional<value::SlotId>{useCollator, collatorSlot}, kEmptyPlanNodeId); @@ -282,4 +288,72 @@ TEST_F(HashAggStageTest, HashAggCollationTest) { } } +TEST_F(HashAggStageTest, HashAggSeekKeysTest) { + auto ctx = makeCompileCtx(); + + // Create a seek slot we will use to peek into the hash table. + auto seekSlot = generateSlotId(); + value::OwnedValueAccessor seekAccessor; + ctx->pushCorrelated(seekSlot, &seekAccessor); + + // Build a scan of the [5,6,7,5,6,7,6,7,7] input array. + auto [inputTag, inputVal] = + stage_builder::makeValue(BSON_ARRAY(5 << 6 << 7 << 5 << 6 << 7 << 6 << 7 << 7)); + auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal); + + + auto [outputSlot, stage] = [this, seekSlot](value::SlotId scanSlot, + std::unique_ptr<PlanStage> scanStage) { + // Build a HashAggStage, group by the scanSlot and compute a simple count. + auto countsSlot = generateSlotId(); + + auto hashAggStage = makeS<HashAggStage>( + std::move(scanStage), + makeSV(scanSlot), + makeEM(countsSlot, + stage_builder::makeFunction("sum", + makeE<EConstant>(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(1)))), + makeSV(seekSlot), + true, + boost::none, + kEmptyPlanNodeId); + + return std::make_pair(countsSlot, std::move(hashAggStage)); + }(scanSlot, std::move(scanStage)); + + // Let's start with '5' as our seek value. + seekAccessor.reset(value::TypeTags::NumberInt32, value::bitcastFrom<int>(5)); + + // Prepare the tree and get the `SlotAccessor` for the output slot. + auto resultAccessor = prepareTree(ctx.get(), stage.get(), outputSlot); + ctx->popCorrelated(); + + ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); + auto [res1Tag, res1Val] = resultAccessor->getViewOfValue(); + // There are '2' occurences of '5' in the input. + assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2)); + ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF); + + // Reposition to '6'. + seekAccessor.reset(value::TypeTags::NumberInt32, value::bitcastFrom<int>(6)); + stage->open(true); + ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); + auto [res2Tag, res2Val] = resultAccessor->getViewOfValue(); + // There are '3' occurences of '6' in the input. + assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3)); + ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF); + + // Reposition to '7'. + seekAccessor.reset(value::TypeTags::NumberInt32, value::bitcastFrom<int>(7)); + stage->open(true); + ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); + auto [res3Tag, res3Val] = resultAccessor->getViewOfValue(); + // There are '4' occurences of '7' in the input. + assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4)); + ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF); + + stage->close(); +} + } // namespace mongo::sbe diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp index 90cae74c3ca..0ceb588eea0 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp +++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp @@ -38,13 +38,21 @@ namespace sbe { HashAggStage::HashAggStage(std::unique_ptr<PlanStage> input, value::SlotVector gbs, value::SlotMap<std::unique_ptr<EExpression>> aggs, + value::SlotVector seekKeysSlots, + bool optimizedClose, boost::optional<value::SlotId> collatorSlot, PlanNodeId planNodeId) : PlanStage("group"_sd, planNodeId), _gbs(std::move(gbs)), _aggs(std::move(aggs)), - _collatorSlot(collatorSlot) { + _collatorSlot(collatorSlot), + _seekKeysSlots(std::move(seekKeysSlots)), + _optimizedClose(optimizedClose) { _children.emplace_back(std::move(input)); + invariant(_seekKeysSlots.empty() || _seekKeysSlots.size() == _gbs.size()); + tassert(5843100, + "HashAgg stage was given optimizedClose=false and seek keys", + _seekKeysSlots.empty() || _optimizedClose); } std::unique_ptr<PlanStage> HashAggStage::clone() const { @@ -52,8 +60,13 @@ std::unique_ptr<PlanStage> HashAggStage::clone() const { for (auto& [k, v] : _aggs) { aggs.emplace(k, v->clone()); } - return std::make_unique<HashAggStage>( - _children[0]->clone(), _gbs, std::move(aggs), _collatorSlot, _commonStats.nodeId); + return std::make_unique<HashAggStage>(_children[0]->clone(), + _gbs, + std::move(aggs), + _seekKeysSlots, + _optimizedClose, + _collatorSlot, + _commonStats.nodeId); } void HashAggStage::prepare(CompileCtx& ctx) { @@ -78,6 +91,12 @@ void HashAggStage::prepare(CompileCtx& ctx) { _outAccessors[slot] = _outKeyAccessors.back().get(); } + // Process seek keys (if any). The keys must come from outside of the subtree (by definition) so + // we go directly to the compilation context. + for (auto& slot : _seekKeysSlots) { + _seekKeysAccessors.emplace_back(ctx.getAccessor(slot)); + } + counter = 0; for (auto& [slot, expr] : _aggs) { auto [it, inserted] = dupCheck.emplace(slot); @@ -116,45 +135,64 @@ void HashAggStage::open(bool reOpen) { auto optTimer(getOptTimer(_opCtx)); _commonStats.opens++; - _children[0]->open(reOpen); - - if (_collatorAccessor) { - auto [tag, collatorVal] = _collatorAccessor->getViewOfValue(); - uassert(5402503, "collatorSlot must be of collator type", tag == value::TypeTags::collator); - auto collatorView = value::getCollatorView(collatorVal); - const value::MaterializedRowHasher hasher(collatorView); - const value::MaterializedRowEq equator(collatorView); - _ht.emplace(0, hasher, equator); - } else { - _ht.emplace(); - } - while (_children[0]->getNext() == PlanState::ADVANCED) { - value::MaterializedRow key{_inKeyAccessors.size()}; - // Copy keys in order to do the lookup. - size_t idx = 0; - for (auto& p : _inKeyAccessors) { - auto [tag, val] = p->getViewOfValue(); - key.reset(idx++, false, tag, val); + if (!reOpen || !_optimizedClose) { + _children[0]->open(reOpen); + _childOpened = true; + + if (_collatorAccessor) { + auto [tag, collatorVal] = _collatorAccessor->getViewOfValue(); + uassert( + 5402503, "collatorSlot must be of collator type", tag == value::TypeTags::collator); + auto collatorView = value::getCollatorView(collatorVal); + const value::MaterializedRowHasher hasher(collatorView); + const value::MaterializedRowEq equator(collatorView); + _ht.emplace(0, hasher, equator); + } else { + _ht.emplace(); } - auto [it, inserted] = _ht->try_emplace(std::move(key), value::MaterializedRow{0}); - if (inserted) { - // Copy keys. - const_cast<value::MaterializedRow&>(it->first).makeOwned(); - // Initialize accumulators. - it->second.resize(_outAggAccessors.size()); + _seekKeys.resize(_seekKeysAccessors.size()); + + while (_children[0]->getNext() == PlanState::ADVANCED) { + value::MaterializedRow key{_inKeyAccessors.size()}; + // Copy keys in order to do the lookup. + size_t idx = 0; + for (auto& p : _inKeyAccessors) { + auto [tag, val] = p->getViewOfValue(); + key.reset(idx++, false, tag, val); + } + + auto [it, inserted] = _ht->try_emplace(std::move(key), value::MaterializedRow{0}); + if (inserted) { + // Copy keys. + const_cast<value::MaterializedRow&>(it->first).makeOwned(); + // Initialize accumulators. + it->second.resize(_outAggAccessors.size()); + } + + // Accumulate. + _htIt = it; + for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { + auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get()); + _outAggAccessors[idx]->reset(owned, tag, val); + } } - // Accumulate. - _htIt = it; - for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { - auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get()); - _outAggAccessors[idx]->reset(owned, tag, val); + if (_optimizedClose) { + _children[0]->close(); + _childOpened = false; } } - _children[0]->close(); + if (!_seekKeysAccessors.empty()) { + // Copy keys in order to do the lookup. + size_t idx = 0; + for (auto& p : _seekKeysAccessors) { + auto [tag, val] = p->getViewOfValue(); + _seekKeys.reset(idx++, false, tag, val); + } + } _htIt = _ht->end(); } @@ -163,8 +201,17 @@ PlanState HashAggStage::getNext() { auto optTimer(getOptTimer(_opCtx)); if (_htIt == _ht->end()) { - _htIt = _ht->begin(); + // First invocation of getNext() after open(). + if (!_seekKeysAccessors.empty()) { + _htIt = _ht->find(_seekKeys); + } else { + _htIt = _ht->begin(); + } + } else if (!_seekKeysAccessors.empty()) { + // Subsequent invocation with seek keys. Return only 1 single row (if any). + _htIt = _ht->end(); } else { + // Returning the results of the entire hash table. ++_htIt; } @@ -204,6 +251,11 @@ void HashAggStage::close() { trackClose(); _ht = boost::none; + + if (_childOpened) { + _children[0]->close(); + _childOpened = false; + } } std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const { @@ -233,6 +285,22 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const { }); ret.emplace_back("`]"); + if (!_seekKeysSlots.empty()) { + ret.emplace_back("[`"); + for (size_t idx = 0; idx < _seekKeysSlots.size(); ++idx) { + if (idx) { + ret.emplace_back("`,"); + } + + DebugPrinter::addIdentifier(ret, _seekKeysSlots[idx]); + } + ret.emplace_back("`]"); + } + + if (!_optimizedClose) { + ret.emplace_back("reopen"); + } + if (_collatorSlot) { DebugPrinter::addIdentifier(ret, *_collatorSlot); } diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h index 2010bacbf80..be8c74e1299 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.h +++ b/src/mongo/db/exec/sbe/stages/hash_agg.h @@ -49,19 +49,28 @@ namespace sbe { * from the 'input' tree are not visible higher in tree. Stages higher in the tree can only see the * slots holding the group-by keys as well as those holding the corresponding aggregate values. * + * The optional 'seekKeys', if provided, limit the results returned from the hash table only to + * those equal to seekKeys. + * + * The 'optimizedClose' flag controls whether we can close the child subtree right after building + * the hash table. If true it means that we do not expect the subtree to be reopened. + * * The optional 'collatorSlot', if provided, changes the definition of string equality used when * determining whether two group-by keys are equal. For instance, the plan may require us to do a * case-insensitive group on a string field. * * Debug string representation: * - * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] collatorSlot? childStage + * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] [<seek slots>]? reopen? + * collatorSlot? childStage */ class HashAggStage final : public PlanStage { public: HashAggStage(std::unique_ptr<PlanStage> input, value::SlotVector gbs, value::SlotMap<std::unique_ptr<EExpression>> aggs, + value::SlotVector seekKeysSlots, + bool optimizedClose, boost::optional<value::SlotId> collatorSlot, PlanNodeId planNodeId); @@ -89,11 +98,18 @@ private: const value::SlotVector _gbs; const value::SlotMap<std::unique_ptr<EExpression>> _aggs; const boost::optional<value::SlotId> _collatorSlot; + const value::SlotVector _seekKeysSlots; + // When this operator does not expect to be reopened (almost always) then it can close the child + // early. + const bool _optimizedClose{true}; value::SlotAccessorMap _outAccessors; std::vector<value::SlotAccessor*> _inKeyAccessors; std::vector<std::unique_ptr<HashKeyAccessor>> _outKeyAccessors; + std::vector<value::SlotAccessor*> _seekKeysAccessors; + value::MaterializedRow _seekKeys; + std::vector<std::unique_ptr<HashAggAccessor>> _outAggAccessors; std::vector<std::unique_ptr<vm::CodeFragment>> _aggCodes; @@ -106,6 +122,7 @@ private: vm::ByteCode _bytecode; bool _compiled{false}; + bool _childOpened{false}; }; } // namespace sbe } // namespace mongo diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.cpp b/src/mongo/db/query/sbe_stage_builder_helpers.cpp index b6594aebe9a..add7be68dee 100644 --- a/src/mongo/db/query/sbe_stage_builder_helpers.cpp +++ b/src/mongo/db/query/sbe_stage_builder_helpers.cpp @@ -445,8 +445,13 @@ EvalStage makeHashAgg(EvalStage stage, for (auto& [slot, _] : aggs) { stage.outSlots.push_back(slot); } - stage.stage = sbe::makeS<sbe::HashAggStage>( - std::move(stage.stage), std::move(gbs), std::move(aggs), collatorSlot, planNodeId); + stage.stage = sbe::makeS<sbe::HashAggStage>(std::move(stage.stage), + std::move(gbs), + std::move(aggs), + sbe::makeSV(), + true /* optimized close */, + collatorSlot, + planNodeId); return stage; } |