diff options
author | Irina Yatsenko <irina.yatsenko@mongodb.com> | 2022-08-04 22:50:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-04 23:58:18 +0000 |
commit | ede6b2adff822dd767cef40c5e1351ee8ed27aaa (patch) | |
tree | 2bffb9eb6be4bcf887914ed5af0c5972a4a75708 /src/mongo/db/exec/sbe/stages | |
parent | 441237232383286003f6cf231889fffe93225317 (diff) | |
download | mongo-ede6b2adff822dd767cef40c5e1351ee8ed27aaa.tar.gz |
SERVER-67336 Per-path filters
Diffstat (limited to 'src/mongo/db/exec/sbe/stages')
-rw-r--r-- | src/mongo/db/exec/sbe/stages/column_scan.cpp | 283 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/column_scan.h | 74 |
2 files changed, 311 insertions, 46 deletions
diff --git a/src/mongo/db/exec/sbe/stages/column_scan.cpp b/src/mongo/db/exec/sbe/stages/column_scan.cpp index a003515427f..35e023bfac3 100644 --- a/src/mongo/db/exec/sbe/stages/column_scan.cpp +++ b/src/mongo/db/exec/sbe/stages/column_scan.cpp @@ -52,10 +52,12 @@ TranslatedCell translateCell(PathView path, const SplitCellView& splitCellView) ColumnScanStage::ColumnScanStage(UUID collectionUuid, StringData columnIndexName, std::vector<std::string> paths, + std::vector<bool> includeInOutput, boost::optional<value::SlotId> recordIdSlot, boost::optional<value::SlotId> reconstuctedRecordSlot, value::SlotId rowStoreSlot, std::unique_ptr<EExpression> rowStoreExpr, + std::vector<PathFilter> filteredPaths, PlanYieldPolicy* yieldPolicy, PlanNodeId nodeId, bool participateInTrialRunTracking) @@ -63,26 +65,39 @@ ColumnScanStage::ColumnScanStage(UUID collectionUuid, _collUuid(collectionUuid), _columnIndexName(columnIndexName), _paths(std::move(paths)), + _includeInOutput(std::move(includeInOutput)), _recordIdSlot(recordIdSlot), _reconstructedRecordSlot(reconstuctedRecordSlot), _rowStoreSlot(rowStoreSlot), - _rowStoreExpr(std::move(rowStoreExpr)) {} + _rowStoreExpr(std::move(rowStoreExpr)), + _filteredPaths(std::move(filteredPaths)) { + invariant(_filteredPaths.size() <= _paths.size(), + "Filtered paths should be a subset of all paths"); + invariant(_paths.size() == _includeInOutput.size()); +} std::unique_ptr<PlanStage> ColumnScanStage::clone() const { - std::vector<std::unique_ptr<EExpression>> pathExprs; + std::vector<PathFilter> filteredPaths; + for (const auto& fp : _filteredPaths) { + filteredPaths.emplace_back(fp.pathIndex, fp.filterExpr->clone(), fp.inputSlotId); + } return std::make_unique<ColumnScanStage>(_collUuid, _columnIndexName, _paths, + _includeInOutput, _recordIdSlot, _reconstructedRecordSlot, _rowStoreSlot, _rowStoreExpr ? _rowStoreExpr->clone() : nullptr, + std::move(filteredPaths), _yieldPolicy, _commonStats.nodeId, _participateInTrialRunTracking); } void ColumnScanStage::prepare(CompileCtx& ctx) { + ctx.root = this; + if (_reconstructedRecordSlot) { _reconstructedRecordAccessor = std::make_unique<value::OwnedValueAccessor>(); } @@ -92,10 +107,19 @@ void ColumnScanStage::prepare(CompileCtx& ctx) { _rowStoreAccessor = std::make_unique<value::OwnedValueAccessor>(); if (_rowStoreExpr) { - ctx.root = this; _rowStoreExprCode = _rowStoreExpr->compile(ctx); } + _filterInputAccessors.resize(_filteredPaths.size()); + for (size_t idx = 0; idx < _filterInputAccessors.size(); ++idx) { + auto slot = _filteredPaths[idx].inputSlotId; + auto [it, inserted] = _filterInputAccessorsMap.emplace(slot, &_filterInputAccessors[idx]); + uassert(6610212, str::stream() << "duplicate slot: " << slot, inserted); + } + for (auto& filteredPath : _filteredPaths) { + _filterExprsCode.emplace_back(filteredPath.filterExpr->compile(ctx)); + } + tassert(6610200, "'_coll' should not be initialized prior to 'acquireCollection()'", !_coll); std::tie(_coll, _collName, _catalogEpoch) = acquireCollection(_opCtx, _collUuid); @@ -120,12 +144,23 @@ value::SlotAccessor* ColumnScanStage::getAccessor(CompileCtx& ctx, value::SlotId if (_rowStoreSlot == slot) { return _rowStoreAccessor.get(); } + + if (auto it = _filterInputAccessorsMap.find(slot); it != _filterInputAccessorsMap.end()) { + return it->second; + } + return ctx.getAccessor(slot); } void ColumnScanStage::doSaveState(bool relinquishCursor) { + if (_denseColumnCursor) { + _denseColumnCursor->makeOwned(); + _denseColumnCursor->cursor().save(); + } + for (auto& cursor : _columnCursors) { cursor.makeOwned(); + cursor.cursor().save(); } if (_rowStoreCursor && relinquishCursor) { @@ -136,9 +171,6 @@ void ColumnScanStage::doSaveState(bool relinquishCursor) { _rowStoreCursor->setSaveStorageCursorOnDetachFromOperationContext(!relinquishCursor); } - for (auto& cursor : _columnCursors) { - cursor.cursor().save(); - } for (auto& [path, cursor] : _parentPathCursors) { cursor->cursor().saveUnpositioned(); } @@ -170,6 +202,9 @@ void ColumnScanStage::doRestoreState(bool relinquishCursor) { } } + if (_denseColumnCursor) { + _denseColumnCursor->cursor().restore(); + } for (auto& cursor : _columnCursors) { cursor.cursor().restore(); } @@ -182,6 +217,9 @@ void ColumnScanStage::doDetachFromOperationContext() { if (_rowStoreCursor) { _rowStoreCursor->detachFromOperationContext(); } + if (_denseColumnCursor) { + _denseColumnCursor->cursor().detachFromOperationContext(); + } for (auto& cursor : _columnCursors) { cursor.cursor().detachFromOperationContext(); } @@ -194,6 +232,9 @@ void ColumnScanStage::doAttachToOperationContext(OperationContext* opCtx) { if (_rowStoreCursor) { _rowStoreCursor->reattachToOperationContext(opCtx); } + if (_denseColumnCursor) { + _denseColumnCursor->cursor().reattachToOperationContext(opCtx); + } for (auto& cursor : _columnCursors) { cursor.cursor().reattachToOperationContext(opCtx); } @@ -247,24 +288,30 @@ void ColumnScanStage::open(bool reOpen) { auto iam = static_cast<ColumnStoreAccessMethod*>(entry->accessMethod()); - // Eventually we can not include this column for the cases where a known dense column (_id) - // is being read anyway. - - // Add a stats struct that will be shared by overall ColumnScanStats and individual - // cursor. - _columnCursors.emplace_back( - iam->storage()->newCursor(_opCtx, ColumnStore::kRowIdPath), - _specificStats.cursorStats.emplace_back(ColumnStore::kRowIdPath.toString(), false)); - - for (auto&& path : _paths) { - _columnCursors.emplace_back(iam->storage()->newCursor(_opCtx, path), - _specificStats.cursorStats.emplace_back(path, true)); + // The dense _recordId column is only needed if there are no filters (TODO SERVER-68377: + // eventually we can avoid including this column for the cases where a known dense column + // such as _id is being read anyway). + if (_filteredPaths.empty()) { + _denseColumnCursor = std::make_unique<ColumnCursor>( + iam->storage()->newCursor(_opCtx, ColumnStore::kRowIdPath), + _specificStats.cursorStats.emplace_back(ColumnStore::kRowIdPath.toString(), + false /*includeInOutput*/)); + } + for (size_t i = 0; i < _paths.size(); i++) { + _columnCursors.emplace_back( + iam->storage()->newCursor(_opCtx, _paths[i]), + _specificStats.cursorStats.emplace_back(_paths[i], _includeInOutput[i])); } } + // Set the cursors. + if (_denseColumnCursor) { + _denseColumnCursor->seekAtOrPast(RecordId()); + } for (auto& columnCursor : _columnCursors) { columnCursor.seekAtOrPast(RecordId()); } + _recordId = _filteredPaths.empty() ? findMinRecordId() : findNextRecordIdForFilteredColumns(); _open = true; } @@ -323,6 +370,152 @@ void ColumnScanStage::readParentsIntoObj(StringData path, } } +// The result of the filter predicate will be the same regardless of sparseness or sub objects, +// therefore we don't look at the parents and don't consult the row store. +// +// (TODO SERVER-68285) Currently, the per-path predicates expect an object to run on, so we create +// one. This is very inefficient (profiles show considerable time spent under Object::push_back) and +// should be replaced with predicates that run directly on values. The fact that the implementation +// of the filter depends on the implementation of the expressions passed to the stage indicates a +// tight coupling. Unfortunately, this dependency can only be discovered at runtime. +bool ColumnScanStage::checkFilter(CellView cell, size_t filterIndex, const PathValue& path) { + auto [tag, val] = value::makeNewObject(); + value::ValueGuard materializedObjGuard(tag, val); + auto& obj = *value::bitcastTo<value::Object*>(val); + + auto splitCellView = SplitCellView::parse(cell); + auto translatedCell = translateCell(path, splitCellView); + addCellToObject(translatedCell, obj); + + _filterInputAccessors[filterIndex].reset(true /*owned*/, tag, val); + materializedObjGuard.reset(); + return _bytecode.runPredicate(_filterExprsCode[filterIndex].get()); +} + +RecordId ColumnScanStage::findNextRecordIdForFilteredColumns() { + invariant(!_filteredPaths.empty()); + + // Initialize 'targetRecordId' from the filtered cursor we are currently iterating. + RecordId targetRecordId; + { + auto& cursor = cursorForFilteredPath(_filteredPaths[_nextUnmatched]); + if (!cursor.lastCell()) { + return RecordId(); // Have exhausted one of the columns. + } + targetRecordId = cursor.lastCell()->rid; + } + + size_t matchedSinceAdvance = 0; + // The loop will terminate because when 'matchedSinceAdvance' is reset the 'targetRecordId' is + // guaranteed to advance. It will do no more than N 'next()' calls across all cursors, where N + // is the number of records (might do fewer, if for some columns there are missing values). The + // number of seeks and filter checks depends on the selectivity of the filters. + while (matchedSinceAdvance < _filteredPaths.size()) { + auto& cursor = cursorForFilteredPath(_filteredPaths[_nextUnmatched]); + + // Avoid seeking into the column that we started with. + auto& result = cursor.lastCell(); + if (result && result->rid < targetRecordId) { + result = cursor.seekAtOrPast(targetRecordId); + } + if (!result) { + return RecordId(); + } + + if (result->rid > targetRecordId) { + // The column skipped ahead - have to restart at this new record ID. + matchedSinceAdvance = 0; + targetRecordId = result->rid; + } + + if (!checkFilter(result->value, _nextUnmatched, cursor.path())) { + // Advance the column until find a match and restart at this new record ID. + do { + result = cursor.next(); + if (!result) { + return RecordId(); + } + } while (!checkFilter(result->value, _nextUnmatched, cursor.path())); + matchedSinceAdvance = 0; + invariant(result->rid > targetRecordId); + targetRecordId = result->rid; + } + ++matchedSinceAdvance; + _nextUnmatched = (_nextUnmatched + 1) % _filteredPaths.size(); + } + invariant(!targetRecordId.isNull()); + + // Ensure that _all_ cursors have caugth up with the filtered record ID. Some of the cursors + // might skip ahead, which would mean the column is missing a value for this 'recordId'. + for (auto& cursor : _columnCursors) { + const auto& result = cursor.lastCell(); + if (result && result->rid < targetRecordId) { + cursor.seekAtOrPast(targetRecordId); + } + } + + return targetRecordId; +} + +RecordId ColumnScanStage::findMinRecordId() const { + if (_denseColumnCursor) { + // The cursor of the dense column cannot be ahead of any other, so it's always at the + // minimum. + auto& result = _denseColumnCursor->lastCell(); + if (!result) { + return RecordId(); + } + return result->rid; + } + + auto recordId = RecordId(); + for (const auto& cursor : _columnCursors) { + const auto& result = cursor.lastCell(); + if (result && (recordId.isNull() || result->rid < recordId)) { + recordId = result->rid; + } + } + return recordId; +} + +RecordId ColumnScanStage::advanceCursors() { + if (!_filteredPaths.empty()) { + // Nudge forward the "active" filtered cursor. The remaining ones will be synchronized by + // 'findNextRecordIdForFilteredColumns()'. + cursorForFilteredPath(_filteredPaths[_nextUnmatched]).next(); + return findNextRecordIdForFilteredColumns(); + } + + // In absence of filters all cursors iterate forward on their own. Some of the cursors might be + // ahead of the current '_recordId' because there are gaps in their columns - don't move them + // but only those that are at '_recordId' and therefore their values have been consumed. While + // at it, compute the new min record ID. + auto nextRecordId = RecordId(); + if (_denseColumnCursor) { + invariant(_denseColumnCursor->lastCell()->rid == _recordId, + "Dense cursor should always be at the current minimum record ID"); + auto cell = _denseColumnCursor->next(); + if (!cell) { + return RecordId(); + } + nextRecordId = cell->rid; + } + for (auto& cursor : _columnCursors) { + auto& cell = cursor.lastCell(); + if (!cell) { + continue; // this column has been exhausted + } + if (cell->rid == _recordId) { + cell = cursor.next(); + } + if (cell && (nextRecordId.isNull() || cell->rid < nextRecordId)) { + invariant(!_denseColumnCursor, "Dense cursor should have the next lowest record ID"); + nextRecordId = cell->rid; + } + } + return nextRecordId; +} + PlanState ColumnScanStage::getNext() { auto optTimer(getOptTimer(_opCtx)); @@ -332,35 +525,32 @@ PlanState ColumnScanStage::getNext() { checkForInterrupt(_opCtx); - // Find minimum record ID of all column cursors. - _recordId = RecordId(); - for (auto& cursor : _columnCursors) { - auto& result = cursor.lastCell(); - if (result && (_recordId.isNull() || result->rid < _recordId)) { - _recordId = result->rid; - } - } - if (_recordId.isNull()) { return trackPlanState(PlanState::IS_EOF); } + bool useRowStore = false; + auto [outTag, outVal] = value::makeNewObject(); auto& outObj = *value::bitcastTo<value::Object*>(outVal); value::ValueGuard materializedObjGuard(outTag, outVal); StringDataSet pathsRead; - bool useRowStore = false; for (size_t i = 0; i < _columnCursors.size(); ++i) { - auto& lastCell = _columnCursors[i].lastCell(); - const auto& path = _columnCursors[i].path(); + if (!_includeInOutput[i]) { + continue; + } + auto& cursor = _columnCursors[i]; + auto& lastCell = cursor.lastCell(); boost::optional<SplitCellView> splitCellView; if (lastCell && lastCell->rid == _recordId) { splitCellView = SplitCellView::parse(lastCell->value); } - if (_columnCursors[i].includeInOutput() && !useRowStore) { + const auto& path = cursor.path(); + + if (!useRowStore) { if (splitCellView && (splitCellView->hasSubPaths || splitCellView->hasDuplicateFields)) { useRowStore = true; @@ -376,10 +566,6 @@ PlanState ColumnScanStage::getNext() { } } } - - if (splitCellView) { - _columnCursors[i].next(); - } } if (useRowStore) { @@ -423,6 +609,8 @@ PlanState ColumnScanStage::getNext() { _tracker = nullptr; uasserted(ErrorCodes::QueryTrialRunCompleted, "Trial run early exit in scan"); } + + _recordId = advanceCursors(); return trackPlanState(PlanState::ADVANCED); } @@ -507,6 +695,31 @@ std::vector<DebugPrinter::Block> ColumnScanStage::debugPrint() const { } ret.emplace_back(DebugPrinter::Block("`]")); + // Print out per-path filters (if any). + if (!_filteredPaths.empty()) { + ret.emplace_back(DebugPrinter::Block("[`")); + for (size_t idx = 0; idx < _filteredPaths.size(); ++idx) { + if (idx) { + ret.emplace_back(DebugPrinter::Block("`;")); + } + + ret.emplace_back(str::stream() + << "\"" << _paths[_filteredPaths[idx].pathIndex] << "\": "); + DebugPrinter::addIdentifier(ret, _filteredPaths[idx].inputSlotId); + ret.emplace_back(DebugPrinter::Block("`,")); + DebugPrinter::addBlocks(ret, _filteredPaths[idx].filterExpr->debugPrint()); + } + ret.emplace_back(DebugPrinter::Block("`]")); + } + + if (_rowStoreExpr) { + ret.emplace_back(DebugPrinter::Block("[`")); + DebugPrinter::addIdentifier(ret, _rowStoreSlot); + ret.emplace_back(DebugPrinter::Block("`,")); + DebugPrinter::addBlocks(ret, _rowStoreExpr->debugPrint()); + ret.emplace_back(DebugPrinter::Block("`]")); + } + ret.emplace_back("@\"`"); DebugPrinter::addIdentifier(ret, _collUuid.toString()); ret.emplace_back("`\""); diff --git a/src/mongo/db/exec/sbe/stages/column_scan.h b/src/mongo/db/exec/sbe/stages/column_scan.h index 94bc8fa4034..7d30152f46d 100644 --- a/src/mongo/db/exec/sbe/stages/column_scan.h +++ b/src/mongo/db/exec/sbe/stages/column_scan.h @@ -41,24 +41,40 @@ namespace sbe { /** * A stage that scans provided columnar index. * - * Currently the stage produces an object into the 'recordSlot' such that accessing any of the given - * paths in it would be equivalent to accessing the paths in the corresponding object from the - * associated row store. In the future the stage will be extended to produce separate outputs for - * each path without materializing this intermediate object unless requested by the client. + * Currently the stage produces an object into the 'reconstructedRecordSlot' such that accessing any + * of the given paths in it would be equivalent to accessing the paths in the corresponding object + * from the associated row store. In the future the stage will be extended to produce separate + * outputs for each path without materializing this intermediate object unless requested by the + * client. * * Debug string representation: * - * COLUMN_SCAN recordSlot|none recordIdSlot|none [path_1, ..., path_n] collectionUuid indexName + * COLUMN_SCAN reconstructedRecordSlot|none recordIdSlot|none [path_1, ..., path_n] + * [filter_path_1: filterSlot_1, filterExpr_1; ...]? [roStoreSlot, rowStoreExpr]? + * collectionUuid indexName */ class ColumnScanStage final : public PlanStage { public: + struct PathFilter { + size_t pathIndex; // index into the paths array the stage will be using + std::unique_ptr<EExpression> filterExpr; + value::SlotId inputSlotId; + + PathFilter(size_t pathIndex, + std::unique_ptr<EExpression> filterExpr, + value::SlotId inputSlotId) + : pathIndex(pathIndex), filterExpr(std::move(filterExpr)), inputSlotId(inputSlotId) {} + }; + ColumnScanStage(UUID collectionUuid, StringData columnIndexName, std::vector<std::string> paths, + std::vector<bool> includeInOutput, boost::optional<value::SlotId> recordIdSlot, boost::optional<value::SlotId> reconstructedRecordSlot, value::SlotId rowStoreSlot, std::unique_ptr<EExpression> rowStoreExpr, + std::vector<PathFilter> filteredPaths, PlanYieldPolicy* yieldPolicy, PlanNodeId planNodeId, bool participateInTrialRunTracking = true); @@ -154,6 +170,9 @@ private: boost::optional<FullCellView>& lastCell() { return _lastCell; } + const boost::optional<FullCellView>& lastCell() const { + return _lastCell; + } size_t numNexts() const { return _stats.numNexts; @@ -184,6 +203,21 @@ private: void readParentsIntoObj(StringData path, value::Object* out, StringDataSet* pathsReadSetOut); + bool checkFilter(CellView cell, size_t filterIndex, const PathValue& path); + + // Finds the smallest record ID such that: + // 1) it is greater or equal to the record ID of all filtered columns cursors prior to the call; + // 2) the record with this ID passes the filters of all filtered columns. + // Ensures that the cursors are set to this record ID unless it's missing in the column (which + // is only possible for the non-filtered columns). + RecordId findNextRecordIdForFilteredColumns(); + + // Finds the lowest record ID across all cursors. Doesn't move any of the cursors. + RecordId findMinRecordId() const; + + // Move cursors to the next record to be processed. + RecordId advanceCursors(); + // The columnar index this stage is scanning and the associated row store collection. const UUID _collUuid; const std::string _columnIndexName; @@ -192,13 +226,16 @@ private: boost::optional<uint64_t> _catalogEpoch; // and are not changed afterwards. std::weak_ptr<const IndexCatalogEntry> _weakIndexCatalogEntry; - // Paths to be read from the index. + // Paths to be read from the index. '_includeInOutput' defines which of the fields should be + // included into the reconstructed record and the order of paths in '_paths' defines the + // orderding of the fields. The two vectors should have the same size. NB: No paths is possible + // when no filters are used and only constant computed columns are projected. In this case only + // the dense record ID column will be read. const std::vector<std::string> _paths; + const std::vector<bool> _includeInOutput; // The record id in the row store that is used to connect the per-path entries in the columnar - // index and to retrieve the full record from the row store, if necessary. Because we put into - // the slot the address of record id, we must guarantee that its lifetime is as long as the - // stage's. + // index and to retrieve the full record from the row store, if necessary. RecordId _recordId; const boost::optional<value::SlotId> _recordIdSlot; @@ -218,17 +255,32 @@ private: const value::SlotId _rowStoreSlot; const std::unique_ptr<EExpression> _rowStoreExpr; + // Per path filters. The slots must be allocated by the client but downstream stages must not + // read from them. Multiple filters form a conjunction where each branch of the AND only passes + // when a value exists. Empty '_filteredPaths' means there are no filters. + const std::vector<PathFilter> _filteredPaths; + ColumnCursor& cursorForFilteredPath(const PathFilter& fp) { + return _columnCursors[fp.pathIndex]; + } + size_t _nextUnmatched = 0; // used when searching for the next matching record + std::unique_ptr<value::OwnedValueAccessor> _reconstructedRecordAccessor; std::unique_ptr<value::OwnedValueAccessor> _recordIdAccessor; std::unique_ptr<value::OwnedValueAccessor> _rowStoreAccessor; + std::vector<value::OwnedValueAccessor> _filterInputAccessors; + value::SlotAccessorMap _filterInputAccessorsMap; vm::ByteCode _bytecode; std::unique_ptr<vm::CodeFragment> _rowStoreExprCode; + std::vector<std::unique_ptr<vm::CodeFragment>> _filterExprsCode; - // Cursors to simultaneously read from the sections of the index for each path (and, possibly, - // auxiliary sections) and from the row store. + // Cursors to simultaneously read from the sections of the index for each path. std::vector<ColumnCursor> _columnCursors; StringMap<std::unique_ptr<ColumnCursor>> _parentPathCursors; + // Dense column contains record ids for all records. It is necessary to support projection + // semantics for missing values on paths. + std::unique_ptr<ColumnCursor> _denseColumnCursor; + // Cursor into the associated row store. std::unique_ptr<SeekableRecordCursor> _rowStoreCursor; bool _open{false}; |