summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2021-07-14 21:03:52 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-22 17:22:20 +0000
commitf5afeb9966a9a58f26347c3670bd42f36560eb8a (patch)
treeff175d6bb64d3fb9b4c3f80e936b94c8e2f2b445
parent491301856f66409cbc57106d785153f40316a44a (diff)
downloadmongo-f5afeb9966a9a58f26347c3670bd42f36560eb8a.tar.gz
SERVER-58431 Modify HashAgg stage in preparation for $lookup + $group pushdown
Implemented optinal seek keys used to lookup a particular key in the hash table.
-rw-r--r--src/mongo/db/exec/sbe/parser/parser.cpp2
-rw-r--r--src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp4
-rw-r--r--src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp74
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.cpp138
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.h19
-rw-r--r--src/mongo/db/query/sbe_stage_builder_helpers.cpp9
6 files changed, 208 insertions, 38 deletions
diff --git a/src/mongo/db/exec/sbe/parser/parser.cpp b/src/mongo/db/exec/sbe/parser/parser.cpp
index 6fd95a5d720..1679d0b900f 100644
--- a/src/mongo/db/exec/sbe/parser/parser.cpp
+++ b/src/mongo/db/exec/sbe/parser/parser.cpp
@@ -1129,6 +1129,8 @@ void Parser::walkGroup(AstQuery& ast) {
std::move(ast.nodes[inputPos]->stage),
lookupSlots(std::move(ast.nodes[0]->identifiers)),
lookupSlots(std::move(ast.nodes[1]->projects)),
+ makeSV(),
+ true,
collatorSlotPos ? lookupSlot(std::move(ast.nodes[collatorSlotPos]->identifier))
: boost::none,
getCurrentPlanNodeId());
diff --git a/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp b/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp
index 29532022114..e75dced43cc 100644
--- a/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp
+++ b/src/mongo/db/exec/sbe/parser/sbe_parser_test.cpp
@@ -358,6 +358,8 @@ protected:
sbe::value::SlotId{3},
stage_builder::makeFunction(
"max", sbe::makeE<sbe::EVariable>(sbe::value::SlotId{1}))),
+ sbe::makeSV(),
+ true,
boost::none, /* optional collator slot */
planNodeId),
// GROUP with a collator slot.
@@ -370,6 +372,8 @@ protected:
sbe::value::SlotId{3},
stage_builder::makeFunction(
"max", sbe::makeE<sbe::EVariable>(sbe::value::SlotId{1}))),
+ sbe::makeSV(),
+ true,
sbe::value::SlotId{4}, /* optional collator slot */
planNodeId),
// LIMIT
diff --git a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
index f8972a44c87..35467329b18 100644
--- a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
+++ b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
@@ -81,6 +81,8 @@ TEST_F(HashAggStageTest, HashAggMinMaxTest) {
collMaxSlot,
stage_builder::makeFunction(
"collMax", collExpr->clone(), makeE<EVariable>(scanSlot))),
+ makeSV(),
+ true,
boost::none,
kEmptyPlanNodeId);
@@ -135,6 +137,8 @@ TEST_F(HashAggStageTest, HashAggAddToSetTest) {
makeEM(hashAggSlot,
stage_builder::makeFunction(
"collAddToSet", std::move(collExpr), makeE<EVariable>(scanSlot))),
+ makeSV(),
+ true,
boost::none,
kEmptyPlanNodeId);
@@ -219,6 +223,8 @@ TEST_F(HashAggStageTest, HashAggCollationTest) {
"sum",
makeE<EConstant>(value::TypeTags::NumberInt64,
value::bitcastFrom<int64_t>(1)))),
+ makeSV(),
+ true,
boost::optional<value::SlotId>{useCollator, collatorSlot},
kEmptyPlanNodeId);
@@ -282,4 +288,72 @@ TEST_F(HashAggStageTest, HashAggCollationTest) {
}
}
+TEST_F(HashAggStageTest, HashAggSeekKeysTest) {
+ auto ctx = makeCompileCtx();
+
+ // Create a seek slot we will use to peek into the hash table.
+ auto seekSlot = generateSlotId();
+ value::OwnedValueAccessor seekAccessor;
+ ctx->pushCorrelated(seekSlot, &seekAccessor);
+
+ // Build a scan of the [5,6,7,5,6,7,6,7,7] input array.
+ auto [inputTag, inputVal] =
+ stage_builder::makeValue(BSON_ARRAY(5 << 6 << 7 << 5 << 6 << 7 << 6 << 7 << 7));
+ auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal);
+
+
+ auto [outputSlot, stage] = [this, seekSlot](value::SlotId scanSlot,
+ std::unique_ptr<PlanStage> scanStage) {
+ // Build a HashAggStage, group by the scanSlot and compute a simple count.
+ auto countsSlot = generateSlotId();
+
+ auto hashAggStage = makeS<HashAggStage>(
+ std::move(scanStage),
+ makeSV(scanSlot),
+ makeEM(countsSlot,
+ stage_builder::makeFunction("sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64,
+ value::bitcastFrom<int64_t>(1)))),
+ makeSV(seekSlot),
+ true,
+ boost::none,
+ kEmptyPlanNodeId);
+
+ return std::make_pair(countsSlot, std::move(hashAggStage));
+ }(scanSlot, std::move(scanStage));
+
+ // Let's start with '5' as our seek value.
+ seekAccessor.reset(value::TypeTags::NumberInt32, value::bitcastFrom<int>(5));
+
+ // Prepare the tree and get the `SlotAccessor` for the output slot.
+ auto resultAccessor = prepareTree(ctx.get(), stage.get(), outputSlot);
+ ctx->popCorrelated();
+
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res1Tag, res1Val] = resultAccessor->getViewOfValue();
+ // There are '2' occurences of '5' in the input.
+ assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2));
+ ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
+
+ // Reposition to '6'.
+ seekAccessor.reset(value::TypeTags::NumberInt32, value::bitcastFrom<int>(6));
+ stage->open(true);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res2Tag, res2Val] = resultAccessor->getViewOfValue();
+ // There are '3' occurences of '6' in the input.
+ assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3));
+ ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
+
+ // Reposition to '7'.
+ seekAccessor.reset(value::TypeTags::NumberInt32, value::bitcastFrom<int>(7));
+ stage->open(true);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res3Tag, res3Val] = resultAccessor->getViewOfValue();
+ // There are '4' occurences of '7' in the input.
+ assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4));
+ ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
+
+ stage->close();
+}
+
} // namespace mongo::sbe
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
index 90cae74c3ca..0ceb588eea0 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
@@ -38,13 +38,21 @@ namespace sbe {
HashAggStage::HashAggStage(std::unique_ptr<PlanStage> input,
value::SlotVector gbs,
value::SlotMap<std::unique_ptr<EExpression>> aggs,
+ value::SlotVector seekKeysSlots,
+ bool optimizedClose,
boost::optional<value::SlotId> collatorSlot,
PlanNodeId planNodeId)
: PlanStage("group"_sd, planNodeId),
_gbs(std::move(gbs)),
_aggs(std::move(aggs)),
- _collatorSlot(collatorSlot) {
+ _collatorSlot(collatorSlot),
+ _seekKeysSlots(std::move(seekKeysSlots)),
+ _optimizedClose(optimizedClose) {
_children.emplace_back(std::move(input));
+ invariant(_seekKeysSlots.empty() || _seekKeysSlots.size() == _gbs.size());
+ tassert(5843100,
+ "HashAgg stage was given optimizedClose=false and seek keys",
+ _seekKeysSlots.empty() || _optimizedClose);
}
std::unique_ptr<PlanStage> HashAggStage::clone() const {
@@ -52,8 +60,13 @@ std::unique_ptr<PlanStage> HashAggStage::clone() const {
for (auto& [k, v] : _aggs) {
aggs.emplace(k, v->clone());
}
- return std::make_unique<HashAggStage>(
- _children[0]->clone(), _gbs, std::move(aggs), _collatorSlot, _commonStats.nodeId);
+ return std::make_unique<HashAggStage>(_children[0]->clone(),
+ _gbs,
+ std::move(aggs),
+ _seekKeysSlots,
+ _optimizedClose,
+ _collatorSlot,
+ _commonStats.nodeId);
}
void HashAggStage::prepare(CompileCtx& ctx) {
@@ -78,6 +91,12 @@ void HashAggStage::prepare(CompileCtx& ctx) {
_outAccessors[slot] = _outKeyAccessors.back().get();
}
+ // Process seek keys (if any). The keys must come from outside of the subtree (by definition) so
+ // we go directly to the compilation context.
+ for (auto& slot : _seekKeysSlots) {
+ _seekKeysAccessors.emplace_back(ctx.getAccessor(slot));
+ }
+
counter = 0;
for (auto& [slot, expr] : _aggs) {
auto [it, inserted] = dupCheck.emplace(slot);
@@ -116,45 +135,64 @@ void HashAggStage::open(bool reOpen) {
auto optTimer(getOptTimer(_opCtx));
_commonStats.opens++;
- _children[0]->open(reOpen);
-
- if (_collatorAccessor) {
- auto [tag, collatorVal] = _collatorAccessor->getViewOfValue();
- uassert(5402503, "collatorSlot must be of collator type", tag == value::TypeTags::collator);
- auto collatorView = value::getCollatorView(collatorVal);
- const value::MaterializedRowHasher hasher(collatorView);
- const value::MaterializedRowEq equator(collatorView);
- _ht.emplace(0, hasher, equator);
- } else {
- _ht.emplace();
- }
- while (_children[0]->getNext() == PlanState::ADVANCED) {
- value::MaterializedRow key{_inKeyAccessors.size()};
- // Copy keys in order to do the lookup.
- size_t idx = 0;
- for (auto& p : _inKeyAccessors) {
- auto [tag, val] = p->getViewOfValue();
- key.reset(idx++, false, tag, val);
+ if (!reOpen || !_optimizedClose) {
+ _children[0]->open(reOpen);
+ _childOpened = true;
+
+ if (_collatorAccessor) {
+ auto [tag, collatorVal] = _collatorAccessor->getViewOfValue();
+ uassert(
+ 5402503, "collatorSlot must be of collator type", tag == value::TypeTags::collator);
+ auto collatorView = value::getCollatorView(collatorVal);
+ const value::MaterializedRowHasher hasher(collatorView);
+ const value::MaterializedRowEq equator(collatorView);
+ _ht.emplace(0, hasher, equator);
+ } else {
+ _ht.emplace();
}
- auto [it, inserted] = _ht->try_emplace(std::move(key), value::MaterializedRow{0});
- if (inserted) {
- // Copy keys.
- const_cast<value::MaterializedRow&>(it->first).makeOwned();
- // Initialize accumulators.
- it->second.resize(_outAggAccessors.size());
+ _seekKeys.resize(_seekKeysAccessors.size());
+
+ while (_children[0]->getNext() == PlanState::ADVANCED) {
+ value::MaterializedRow key{_inKeyAccessors.size()};
+ // Copy keys in order to do the lookup.
+ size_t idx = 0;
+ for (auto& p : _inKeyAccessors) {
+ auto [tag, val] = p->getViewOfValue();
+ key.reset(idx++, false, tag, val);
+ }
+
+ auto [it, inserted] = _ht->try_emplace(std::move(key), value::MaterializedRow{0});
+ if (inserted) {
+ // Copy keys.
+ const_cast<value::MaterializedRow&>(it->first).makeOwned();
+ // Initialize accumulators.
+ it->second.resize(_outAggAccessors.size());
+ }
+
+ // Accumulate.
+ _htIt = it;
+ for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) {
+ auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get());
+ _outAggAccessors[idx]->reset(owned, tag, val);
+ }
}
- // Accumulate.
- _htIt = it;
- for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) {
- auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get());
- _outAggAccessors[idx]->reset(owned, tag, val);
+ if (_optimizedClose) {
+ _children[0]->close();
+ _childOpened = false;
}
}
- _children[0]->close();
+ if (!_seekKeysAccessors.empty()) {
+ // Copy keys in order to do the lookup.
+ size_t idx = 0;
+ for (auto& p : _seekKeysAccessors) {
+ auto [tag, val] = p->getViewOfValue();
+ _seekKeys.reset(idx++, false, tag, val);
+ }
+ }
_htIt = _ht->end();
}
@@ -163,8 +201,17 @@ PlanState HashAggStage::getNext() {
auto optTimer(getOptTimer(_opCtx));
if (_htIt == _ht->end()) {
- _htIt = _ht->begin();
+ // First invocation of getNext() after open().
+ if (!_seekKeysAccessors.empty()) {
+ _htIt = _ht->find(_seekKeys);
+ } else {
+ _htIt = _ht->begin();
+ }
+ } else if (!_seekKeysAccessors.empty()) {
+ // Subsequent invocation with seek keys. Return only 1 single row (if any).
+ _htIt = _ht->end();
} else {
+ // Returning the results of the entire hash table.
++_htIt;
}
@@ -204,6 +251,11 @@ void HashAggStage::close() {
trackClose();
_ht = boost::none;
+
+ if (_childOpened) {
+ _children[0]->close();
+ _childOpened = false;
+ }
}
std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const {
@@ -233,6 +285,22 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const {
});
ret.emplace_back("`]");
+ if (!_seekKeysSlots.empty()) {
+ ret.emplace_back("[`");
+ for (size_t idx = 0; idx < _seekKeysSlots.size(); ++idx) {
+ if (idx) {
+ ret.emplace_back("`,");
+ }
+
+ DebugPrinter::addIdentifier(ret, _seekKeysSlots[idx]);
+ }
+ ret.emplace_back("`]");
+ }
+
+ if (!_optimizedClose) {
+ ret.emplace_back("reopen");
+ }
+
if (_collatorSlot) {
DebugPrinter::addIdentifier(ret, *_collatorSlot);
}
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h
index 2010bacbf80..be8c74e1299 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.h
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.h
@@ -49,19 +49,28 @@ namespace sbe {
* from the 'input' tree are not visible higher in tree. Stages higher in the tree can only see the
* slots holding the group-by keys as well as those holding the corresponding aggregate values.
*
+ * The optional 'seekKeys', if provided, limit the results returned from the hash table only to
+ * those equal to seekKeys.
+ *
+ * The 'optimizedClose' flag controls whether we can close the child subtree right after building
+ * the hash table. If true it means that we do not expect the subtree to be reopened.
+ *
* The optional 'collatorSlot', if provided, changes the definition of string equality used when
* determining whether two group-by keys are equal. For instance, the plan may require us to do a
* case-insensitive group on a string field.
*
* Debug string representation:
*
- * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] collatorSlot? childStage
+ * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] [<seek slots>]? reopen?
+ * collatorSlot? childStage
*/
class HashAggStage final : public PlanStage {
public:
HashAggStage(std::unique_ptr<PlanStage> input,
value::SlotVector gbs,
value::SlotMap<std::unique_ptr<EExpression>> aggs,
+ value::SlotVector seekKeysSlots,
+ bool optimizedClose,
boost::optional<value::SlotId> collatorSlot,
PlanNodeId planNodeId);
@@ -89,11 +98,18 @@ private:
const value::SlotVector _gbs;
const value::SlotMap<std::unique_ptr<EExpression>> _aggs;
const boost::optional<value::SlotId> _collatorSlot;
+ const value::SlotVector _seekKeysSlots;
+ // When this operator does not expect to be reopened (almost always) then it can close the child
+ // early.
+ const bool _optimizedClose{true};
value::SlotAccessorMap _outAccessors;
std::vector<value::SlotAccessor*> _inKeyAccessors;
std::vector<std::unique_ptr<HashKeyAccessor>> _outKeyAccessors;
+ std::vector<value::SlotAccessor*> _seekKeysAccessors;
+ value::MaterializedRow _seekKeys;
+
std::vector<std::unique_ptr<HashAggAccessor>> _outAggAccessors;
std::vector<std::unique_ptr<vm::CodeFragment>> _aggCodes;
@@ -106,6 +122,7 @@ private:
vm::ByteCode _bytecode;
bool _compiled{false};
+ bool _childOpened{false};
};
} // namespace sbe
} // namespace mongo
diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.cpp b/src/mongo/db/query/sbe_stage_builder_helpers.cpp
index b6594aebe9a..add7be68dee 100644
--- a/src/mongo/db/query/sbe_stage_builder_helpers.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_helpers.cpp
@@ -445,8 +445,13 @@ EvalStage makeHashAgg(EvalStage stage,
for (auto& [slot, _] : aggs) {
stage.outSlots.push_back(slot);
}
- stage.stage = sbe::makeS<sbe::HashAggStage>(
- std::move(stage.stage), std::move(gbs), std::move(aggs), collatorSlot, planNodeId);
+ stage.stage = sbe::makeS<sbe::HashAggStage>(std::move(stage.stage),
+ std::move(gbs),
+ std::move(aggs),
+ sbe::makeSV(),
+ true /* optimized close */,
+ collatorSlot,
+ planNodeId);
return stage;
}