diff options
author | Ian Boros <ian.boros@mongodb.com> | 2022-04-27 11:01:18 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-28 16:01:18 +0000 |
commit | cadaba712c0f063816faa766098725da73ad073e (patch) | |
tree | 129440f94e785a26b0d0b077e99d0ada8ae72962 /src/mongo/db/exec/sbe | |
parent | d40ef71d68e67e4b55ffc8c1274a38ed4ead1f97 (diff) | |
download | mongo-cadaba712c0f063816faa766098725da73ad073e.tar.gz |
SERVER-66022 Add fake column index and SBE integration
Diffstat (limited to 'src/mongo/db/exec/sbe')
-rw-r--r-- | src/mongo/db/exec/sbe/stages/column_scan.cpp | 217 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/column_scan.h | 36 |
2 files changed, 219 insertions, 34 deletions
diff --git a/src/mongo/db/exec/sbe/stages/column_scan.cpp b/src/mongo/db/exec/sbe/stages/column_scan.cpp index 4c1e1d7da46..ef70102a8c0 100644 --- a/src/mongo/db/exec/sbe/stages/column_scan.cpp +++ b/src/mongo/db/exec/sbe/stages/column_scan.cpp @@ -32,6 +32,7 @@ #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/size_estimator.h" +#include "mongo/db/exec/sbe/values/columnar.h" namespace mongo { namespace sbe { @@ -127,12 +128,19 @@ value::SlotAccessor* ColumnScanStage::getAccessor(CompileCtx& ctx, value::SlotId } void ColumnScanStage::doSaveState(bool relinquishCursor) { - if (_cursor && relinquishCursor) { - _cursor->save(); + if (_rowStoreCursor && relinquishCursor) { + _rowStoreCursor->save(); } - if (_cursor) { - _cursor->setSaveStorageCursorOnDetachFromOperationContext(!relinquishCursor); + if (_rowStoreCursor) { + _rowStoreCursor->setSaveStorageCursorOnDetachFromOperationContext(!relinquishCursor); + } + + for (auto& cursor : _columnCursors) { + cursor.cursor->save(); + } + for (auto& [path, cursor] : _parentPathCursors) { + cursor->save(); } _coll.reset(); @@ -150,23 +158,42 @@ void ColumnScanStage::doRestoreState(bool relinquishCursor) { tassert(6298603, "Catalog epoch should be initialized", _catalogEpoch); _coll = restoreCollection(_opCtx, *_collName, _collUuid, *_catalogEpoch); - if (_cursor) { + if (_rowStoreCursor) { if (relinquishCursor) { - const bool couldRestore = _cursor->restore(); + const bool couldRestore = _rowStoreCursor->restore(); invariant(couldRestore); } } + + for (auto& cursor : _columnCursors) { + cursor.cursor->restore(); + } + for (auto& [path, cursor] : _parentPathCursors) { + cursor->restore(); + } } void ColumnScanStage::doDetachFromOperationContext() { - if (_cursor) { - _cursor->detachFromOperationContext(); + if (_rowStoreCursor) { + _rowStoreCursor->detachFromOperationContext(); + } + for (auto& cursor : _columnCursors) { + cursor.cursor->detachFromOperationContext(); + } + for (auto& [path, cursor] : _parentPathCursors) { + cursor->detachFromOperationContext(); } } void ColumnScanStage::doAttachToOperationContext(OperationContext* opCtx) { - if (_cursor) { - _cursor->reattachToOperationContext(opCtx); + if (_rowStoreCursor) { + _rowStoreCursor->reattachToOperationContext(opCtx); + } + for (auto& cursor : _columnCursors) { + cursor.cursor->reattachToOperationContext(opCtx); + } + for (auto& [path, cursor] : _parentPathCursors) { + cursor->reattachToOperationContext(opCtx); } } @@ -189,27 +216,92 @@ void ColumnScanStage::open(bool reOpen) { if (_open) { tassert(6298604, "reopened ColumnScanStage but reOpen=false", reOpen); tassert(6298605, "ColumnScanStage is open but _coll is not null", _coll); - tassert(6298606, "ColumnScanStage is open but don't have _cursor", _cursor); + tassert(6298606, "ColumnScanStage is open but don't have _rowStoreCursor", _rowStoreCursor); } else { tassert(6298607, "first open to ColumnScanStage but reOpen=true", !reOpen); if (!_coll) { // We're being opened after 'close()'. We need to re-acquire '_coll' in this case and // make some validity checks (the collection has not been dropped, renamed, etc.). - tassert(6298608, "ColumnScanStage is not open but have _cursor", !_cursor); + tassert( + 6298608, "ColumnScanStage is not open but have _rowStoreCursor", !_rowStoreCursor); tassert(6298609, "Collection name should be initialized", _collName); tassert(6298610, "Catalog epoch should be initialized", _catalogEpoch); _coll = restoreCollection(_opCtx, *_collName, _collUuid, *_catalogEpoch); } } - if (!_cursor) { - _cursor = _coll->getCursor(_opCtx, true); + if (!_rowStoreCursor) { + _rowStoreCursor = _coll->getCursor(_opCtx, true); + } + + if (_columnCursors.empty()) { + // We fake the special Row ID column by just using _id, for now. + _columnCursors.push_back(ColumnCursor{ + std::make_unique<FakeCursorForPath>("_id", _coll->getCursor(_opCtx, true)), + boost::none, + false /* add to document */ + }); + + for (auto&& path : _paths) { + _columnCursors.push_back(ColumnCursor{ + std::make_unique<FakeCursorForPath>(path, _coll->getCursor(_opCtx, true)), + boost::none, + true /* add to document */ + }); + } } _open = true; _firstGetNext = true; } +void ColumnScanStage::readParentsIntoObj(StringData path, + value::Object* outObj, + StringDataSet* pathsReadSetOut, + bool first) { + auto parent = ColumnStore::getParentPath(path); + + // If a top-level path doesn't exist, it just doesn't exist. It can't exist in some places + // within a document but not others. No further inspection is necessary. + + if (!parent) { + return; + } + + if (pathsReadSetOut->contains(*parent)) { + // We've already read the parent in, so skip it. + return; + } + + // Create the parent path cursor if necessary. + + // First we try to emplace a nullptr, so that we avoid creating the cursor when we don't have + // to. + auto [it, inserted] = _parentPathCursors.try_emplace(*parent, nullptr); + + // If we inserted a new entry, replace the null with an actual cursor. + if (inserted) { + invariant(it->second == nullptr); + it->second = std::make_unique<FakeCursorForPath>(*parent, _coll->getCursor(_opCtx, true)); + } + + auto optCell = it->second->seekExact(_recordId); + pathsReadSetOut->insert(*parent); + + if (!optCell || optCell->isSparse) { + // We need this cell's parent too. + readParentsIntoObj(*parent, outObj, pathsReadSetOut, false); + } + + if (optCell) { + auto translatedCell = + TranslatedCell{optCell->arrayInfo, *parent, optCell->tags, optCell->vals}; + + addCellToObject(translatedCell, *outObj); + } +} + + PlanState ColumnScanStage::getNext() { auto optTimer(getOptTimer(_opCtx)); @@ -217,32 +309,90 @@ PlanState ColumnScanStage::getNext() { // case it yields as the state will be completely overwritten after the next() call. disableSlotAccess(); - // This call to checkForInterrupt() may result in a call to save() or restore() on the entire - // PlanStage tree if a yield occurs. It's important that we call checkForInterrupt() before - // checking '_needsToCheckCappedPositionLost' since a call to restoreState() may set - // '_needsToCheckCappedPositionLost'. checkForInterrupt(_opCtx); - auto nextRecord = _cursor->next(); - _firstGetNext = false; + if (_firstGetNext) { + for (size_t i = 0; i < _columnCursors.size(); ++i) { + _columnCursors[i].next(); + } + _firstGetNext = false; + } + + // Find minimum record ID of all column cursors. + _recordId = RecordId(); + for (auto& cursor : _columnCursors) { + auto& result = cursor.lastCell; + if (result && (_recordId.isNull() || result->rid < _recordId)) { + _recordId = result->rid; + } + } - if (!nextRecord) { + if (_recordId.isNull()) { return trackPlanState(PlanState::IS_EOF); } - if (_recordIdAccessor) { - _recordId = nextRecord->id; - _recordIdAccessor->reset( - false, value::TypeTags::RecordId, value::bitcastFrom<RecordId*>(&_recordId)); + auto [outTag, outVal] = value::makeNewObject(); + auto& outObj = *value::bitcastTo<value::Object*>(outVal); + value::ValueGuard materializedObjGuard(outTag, outVal); + + StringDataSet parentPathsRead; + bool useRowStore = false; + for (size_t i = 0; i < _columnCursors.size(); ++i) { + auto& lastCell = _columnCursors[i].lastCell; + + const FakeCell* cellForRid = + (lastCell && lastCell->rid == _recordId) ? lastCell.get_ptr() : nullptr; + const auto& path = _columnCursors[i].cursor->path(); + + if (cellForRid && (cellForRid->hasSubPaths || cellForRid->hasDuplicateFields)) { + useRowStore = true; + } else if (!useRowStore && _columnCursors[i].includeInOutput) { + if (!cellForRid || cellForRid->isSparse) { + // Must read in the parent information first. + readParentsIntoObj(path, &outObj, &parentPathsRead); + } + + if (cellForRid) { + auto translatedCell = TranslatedCell{cellForRid->arrayInfo, + _columnCursors[i].path(), + cellForRid->tags, + cellForRid->vals}; + + addCellToObject(translatedCell, outObj); + } + } + + if (cellForRid) { + _columnCursors[i].next(); + } } - _rowStoreAccessor->reset(false, - value::TypeTags::bsonObject, - value::bitcastFrom<const char*>(nextRecord->data.data())); + if (useRowStore) { + std::cout << "Using row store for rid " << _recordId << std::endl; - if (_recordExpr) { - auto [owned, tag, val] = _bytecode.run(_recordExprCode.get()); - _recordAccessor->reset(owned, tag, val); + // TODO: In some cases we can avoid calling seek() on the row store cursor, and instead do + // a next() which should be much cheaper. + auto record = _rowStoreCursor->seekExact(_recordId); + + // If there's no record, the index is out of sync with the row store. + invariant(record); + + _rowStoreAccessor->reset(false, + value::TypeTags::bsonObject, + value::bitcastFrom<const char*>(record->data.data())); + + if (_recordExpr) { + auto [owned, tag, val] = _bytecode.run(_recordExprCode.get()); + _recordAccessor->reset(owned, tag, val); + } + } else { + _recordAccessor->reset(true, outTag, outVal); + materializedObjGuard.reset(); + } + + if (_recordIdAccessor) { + _recordIdAccessor->reset( + false, value::TypeTags::RecordId, value::bitcastFrom<RecordId*>(&_recordId)); } for (size_t idx = 0; idx < _outputFields.size(); ++idx) { @@ -267,8 +417,10 @@ void ColumnScanStage::close() { auto optTimer(getOptTimer(_opCtx)); trackClose(); - _cursor.reset(); + _rowStoreCursor.reset(); _coll.reset(); + _columnCursors.clear(); + _parentPathCursors.clear(); _open = false; } @@ -348,6 +500,5 @@ size_t ColumnScanStage::estimateCompileTimeSize() const { size += size_estimator::estimate(_specificStats); return size; } - } // namespace sbe } // namespace mongo diff --git a/src/mongo/db/exec/sbe/stages/column_scan.h b/src/mongo/db/exec/sbe/stages/column_scan.h index 22ca42ee401..e555d10a114 100644 --- a/src/mongo/db/exec/sbe/stages/column_scan.h +++ b/src/mongo/db/exec/sbe/stages/column_scan.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/config.h" +#include "mongo/db/exec/fake_column_cursor.h" #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/stages/collection_helpers.h" #include "mongo/db/exec/sbe/stages/stages.h" @@ -77,6 +78,36 @@ protected: TrialRunTracker* tracker, TrialRunTrackerAttachResultMask childrenAttachResult) override; private: + struct ColumnCursor { + std::unique_ptr<FakeCursorForPath> cursor; + boost::optional<FakeCell> lastCell; + bool includeInOutput = false; + + boost::optional<FakeCell>& next() { + // TODO For some reason the destructor of 'lastCell' is not called + // on my local asan build unless we explicitly reset it. Maybe + // the same compiler bug Nikita ran into? + lastCell.reset(); + lastCell = cursor->next(); + return lastCell; + } + + boost::optional<FakeCell>& seekAtOrPast(RecordId id) { + lastCell.reset(); + lastCell = cursor->seekAtOrPast(id); + return lastCell; + } + + const PathValue& path() const { + return cursor->path(); + } + }; + + void readParentsIntoObj(StringData path, + value::Object* out, + StringDataSet* pathsReadSetOut, + bool first = true); + const UUID _collUuid; const std::string _columnIndexName; const value::SlotVector _fieldSlots; @@ -109,7 +140,10 @@ private: CollectionPtr _coll; - std::unique_ptr<SeekableRecordCursor> _cursor; + std::unique_ptr<SeekableRecordCursor> _rowStoreCursor; + + std::vector<ColumnCursor> _columnCursors; + StringMap<std::unique_ptr<FakeCursorForPath>> _parentPathCursors; RecordId _recordId; |