diff options
author | Martin Neupauer <xmaton@messengeruser.com> | 2021-04-21 09:38:49 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-29 21:49:53 +0000 |
commit | a1dd77a37ab054564904ce8d0f04dacca04a9d2a (patch) | |
tree | 04cfe21a3419c9bf0d3a101be27fe326db0a8342 | |
parent | 7fe73c9955e44c7e99c5c3487454bb2ad64740e1 (diff) | |
download | mongo-a1dd77a37ab054564904ce8d0f04dacca04a9d2a.tar.gz |
SERVER-55498 Prevent SBE from using unowned values from storage after
yield
This change ensures that accesors produce valid values after yield.
44 files changed, 415 insertions, 120 deletions
diff --git a/src/mongo/db/exec/sbe/parser/parser.cpp b/src/mongo/db/exec/sbe/parser/parser.cpp index 0a8efc14ab7..412162abd10 100644 --- a/src/mongo/db/exec/sbe/parser/parser.cpp +++ b/src/mongo/db/exec/sbe/parser/parser.cpp @@ -678,7 +678,7 @@ void Parser::walkScan(AstQuery& ast) { lookupSlots(ast.nodes[projectsPos]->renames), boost::none, forward, - nullptr, + _yieldPolicy, getCurrentPlanNodeId(), ScanCallbacks({})); } @@ -729,7 +729,7 @@ void Parser::walkParallelScan(AstQuery& ast) { lookupSlot(indexKeyPatternName), ast.nodes[projectsPos]->identifiers, lookupSlots(ast.nodes[projectsPos]->renames), - nullptr, + _yieldPolicy, getCurrentPlanNodeId(), ScanCallbacks({})); } @@ -792,7 +792,7 @@ void Parser::walkSeek(AstQuery& ast) { lookupSlots(ast.nodes[projectsPos]->renames), lookupSlot(ast.nodes[0]->identifier), forward, - nullptr, + _yieldPolicy, getCurrentPlanNodeId(), ScanCallbacks({})); } @@ -853,7 +853,7 @@ void Parser::walkIndexScan(AstQuery& ast) { vars, boost::none, boost::none, - nullptr, + _yieldPolicy, getCurrentPlanNodeId(), LockAcquisitionCallback{}); } @@ -914,7 +914,7 @@ void Parser::walkIndexSeek(AstQuery& ast) { vars, lookupSlot(ast.nodes[0]->identifier), lookupSlot(ast.nodes[1]->identifier), - nullptr, + _yieldPolicy, getCurrentPlanNodeId(), LockAcquisitionCallback{}); } @@ -1209,7 +1209,7 @@ void Parser::walkSkip(AstQuery& ast) { void Parser::walkCoScan(AstQuery& ast) { walkChildren(ast); - ast.stage = makeS<CoScanStage>(getCurrentPlanNodeId()); + ast.stage = makeS<CoScanStage>(getCurrentPlanNodeId(), _yieldPolicy); } void Parser::walkTraverse(AstQuery& ast) { @@ -1847,10 +1847,12 @@ Parser::Parser(RuntimeEnvironment* env) : _env(env) { std::unique_ptr<PlanStage> Parser::parse(OperationContext* opCtx, StringData defaultDb, - StringData line) { + StringData line, + PlanYieldPolicy* yieldPolicy) { std::shared_ptr<AstQuery> ast; _opCtx = opCtx; + _yieldPolicy = yieldPolicy; _defaultDb = defaultDb.toString(); auto result = _parser.parse_n(line.rawData(), line.size(), ast); diff --git a/src/mongo/db/exec/sbe/parser/parser.h b/src/mongo/db/exec/sbe/parser/parser.h index dc99252bc4c..8bc47abf56a 100644 --- a/src/mongo/db/exec/sbe/parser/parser.h +++ b/src/mongo/db/exec/sbe/parser/parser.h @@ -42,6 +42,7 @@ #include "mongo/db/query/sbe_stage_builder.h" namespace mongo { +class PlanYieldPolicy; namespace sbe { struct ParsedQueryTree { std::string identifier; @@ -66,7 +67,8 @@ public: Parser(RuntimeEnvironment* env); std::unique_ptr<PlanStage> parse(OperationContext* opCtx, StringData defaultDb, - StringData line); + StringData line, + PlanYieldPolicy* yieldPolicy = nullptr); std::pair<boost::optional<value::SlotId>, boost::optional<value::SlotId>> getTopLevelSlots() const { @@ -78,6 +80,7 @@ private: using SpoolBufferLookupTable = std::map<std::string, SpoolId>; peg::parser _parser; OperationContext* _opCtx{nullptr}; + PlanYieldPolicy* _yieldPolicy{nullptr}; std::string _defaultDb; SymbolTable _symbolsLookupTable; SpoolBufferLookupTable _spoolBuffersLookupTable; diff --git a/src/mongo/db/exec/sbe/sbe_key_string_test.cpp b/src/mongo/db/exec/sbe/sbe_key_string_test.cpp index 6eae9c7854f..990e487a982 100644 --- a/src/mongo/db/exec/sbe/sbe_key_string_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_key_string_test.cpp @@ -142,7 +142,7 @@ TEST_F(SBEKeyStringTest, Basic) { bsonObjAccessor.reset(value::TypeTags::bsonObject, value::bitcastFrom<const char*>(testValues.objdata())); - std::vector<sbe::value::ViewOfValueAccessor> keyStringValues; + std::vector<sbe::value::OwnedValueAccessor> keyStringValues; BufBuilder builder; for (auto&& element : testValues) { while (keyStringValues.empty()) { @@ -185,7 +185,7 @@ TEST(SBEKeyStringTest, KeyComponentInclusion) { indexKeysToInclude.set(0); indexKeysToInclude.set(2); - std::vector<value::ViewOfValueAccessor> accessors; + std::vector<value::OwnedValueAccessor> accessors; accessors.resize(2); BufBuilder builder; diff --git a/src/mongo/db/exec/sbe/stages/branch.cpp b/src/mongo/db/exec/sbe/stages/branch.cpp index 6ed2547ec60..dc69c6012a1 100644 --- a/src/mongo/db/exec/sbe/stages/branch.cpp +++ b/src/mongo/db/exec/sbe/stages/branch.cpp @@ -156,7 +156,7 @@ PlanState BranchStage::getNext() { void BranchStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); if (_thenOpened) { _children[0]->close(); diff --git a/src/mongo/db/exec/sbe/stages/bson_scan.cpp b/src/mongo/db/exec/sbe/stages/bson_scan.cpp index bc94f504b4c..430977f3ac4 100644 --- a/src/mongo/db/exec/sbe/stages/bson_scan.cpp +++ b/src/mongo/db/exec/sbe/stages/bson_scan.cpp @@ -136,7 +136,7 @@ PlanState BSONScanStage::getNext() { void BSONScanStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); } std::unique_ptr<PlanStageStats> BSONScanStage::getStats(bool includeDebugInfo) const { diff --git a/src/mongo/db/exec/sbe/stages/bson_scan.h b/src/mongo/db/exec/sbe/stages/bson_scan.h index d0d17856a50..905cc8f7e03 100644 --- a/src/mongo/db/exec/sbe/stages/bson_scan.h +++ b/src/mongo/db/exec/sbe/stages/bson_scan.h @@ -66,7 +66,7 @@ private: std::unique_ptr<value::ViewOfValueAccessor> _recordAccessor; - value::FieldAccessorMap _fieldAccessors; + value::FieldViewAccessorMap _fieldAccessors; value::SlotAccessorMap _varAccessors; const char* _bsonCurrent; diff --git a/src/mongo/db/exec/sbe/stages/check_bounds.cpp b/src/mongo/db/exec/sbe/stages/check_bounds.cpp index c1d4707e10a..b80e197cb10 100644 --- a/src/mongo/db/exec/sbe/stages/check_bounds.cpp +++ b/src/mongo/db/exec/sbe/stages/check_bounds.cpp @@ -82,6 +82,9 @@ PlanState CheckBoundsStage::getNext() { return trackPlanState(PlanState::IS_EOF); } + // We are about to call getNext() on our child so do not bother saving our internal state in + // case it yields as the state will be completely overwritten after the getNext() call. + disableSlotAccess(); auto state = _children[0]->getNext(); if (state == PlanState::ADVANCED) { @@ -130,7 +133,7 @@ PlanState CheckBoundsStage::getNext() { void CheckBoundsStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _children[0]->close(); } @@ -166,4 +169,12 @@ std::vector<DebugPrinter::Block> CheckBoundsStage::debugPrint() const { DebugPrinter::addBlocks(ret, _children[0]->debugPrint()); return ret; } + +void CheckBoundsStage::doSaveState() { + if (!slotsAccessible()) { + return; + } + + _outAccessor.makeOwned(); +} } // namespace mongo::sbe diff --git a/src/mongo/db/exec/sbe/stages/check_bounds.h b/src/mongo/db/exec/sbe/stages/check_bounds.h index 21971a612eb..c4ccdf7d37e 100644 --- a/src/mongo/db/exec/sbe/stages/check_bounds.h +++ b/src/mongo/db/exec/sbe/stages/check_bounds.h @@ -84,6 +84,9 @@ public: const SpecificStats* getSpecificStats() const final; std::vector<DebugPrinter::Block> debugPrint() const final; +protected: + void doSaveState() final; + private: const CheckBoundsParams _params; IndexBoundsChecker _checker; diff --git a/src/mongo/db/exec/sbe/stages/co_scan.cpp b/src/mongo/db/exec/sbe/stages/co_scan.cpp index e0b1cdc9dac..86e460ee208 100644 --- a/src/mongo/db/exec/sbe/stages/co_scan.cpp +++ b/src/mongo/db/exec/sbe/stages/co_scan.cpp @@ -34,7 +34,8 @@ #include "mongo/db/exec/sbe/expressions/expression.h" namespace mongo::sbe { -CoScanStage::CoScanStage(PlanNodeId planNodeId) : PlanStage("coscan"_sd, planNodeId) {} +CoScanStage::CoScanStage(PlanNodeId planNodeId, PlanYieldPolicy* yieldPolicy) + : PlanStage("coscan"_sd, yieldPolicy, planNodeId) {} std::unique_ptr<PlanStage> CoScanStage::clone() const { return std::make_unique<CoScanStage>(_commonStats.nodeId); @@ -71,7 +72,7 @@ const SpecificStats* CoScanStage::getSpecificStats() const { void CoScanStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); } } // namespace mongo::sbe diff --git a/src/mongo/db/exec/sbe/stages/co_scan.h b/src/mongo/db/exec/sbe/stages/co_scan.h index f93dd4da699..eb678034740 100644 --- a/src/mongo/db/exec/sbe/stages/co_scan.h +++ b/src/mongo/db/exec/sbe/stages/co_scan.h @@ -42,7 +42,7 @@ namespace mongo::sbe { */ class CoScanStage final : public PlanStage { public: - explicit CoScanStage(PlanNodeId); + explicit CoScanStage(PlanNodeId, PlanYieldPolicy* yieldPolicy = nullptr); std::unique_ptr<PlanStage> clone() const final; diff --git a/src/mongo/db/exec/sbe/stages/exchange.cpp b/src/mongo/db/exec/sbe/stages/exchange.cpp index b6e6602ce69..9f4e94b970d 100644 --- a/src/mongo/db/exec/sbe/stages/exchange.cpp +++ b/src/mongo/db/exec/sbe/stages/exchange.cpp @@ -327,7 +327,7 @@ PlanState ExchangeConsumer::getNext() { void ExchangeConsumer::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); { stdx::unique_lock lock(_state->consumerCloseMutex()); @@ -566,7 +566,7 @@ PlanState ExchangeProducer::getNext() { void ExchangeProducer::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _children[0]->close(); } diff --git a/src/mongo/db/exec/sbe/stages/filter.h b/src/mongo/db/exec/sbe/stages/filter.h index a3ee0355146..8912c47a12d 100644 --- a/src/mongo/db/exec/sbe/stages/filter.h +++ b/src/mongo/db/exec/sbe/stages/filter.h @@ -124,7 +124,7 @@ public: void close() final { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); if (_childOpened) { _children[0]->close(); diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp index b4987417aa6..90cae74c3ca 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp +++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp @@ -202,7 +202,7 @@ const SpecificStats* HashAggStage::getSpecificStats() const { void HashAggStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _ht = boost::none; } diff --git a/src/mongo/db/exec/sbe/stages/hash_join.cpp b/src/mongo/db/exec/sbe/stages/hash_join.cpp index 190b8d7dd05..03f455414e6 100644 --- a/src/mongo/db/exec/sbe/stages/hash_join.cpp +++ b/src/mongo/db/exec/sbe/stages/hash_join.cpp @@ -210,7 +210,7 @@ PlanState HashJoinStage::getNext() { void HashJoinStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _children[1]->close(); _ht = boost::none; } diff --git a/src/mongo/db/exec/sbe/stages/ix_scan.cpp b/src/mongo/db/exec/sbe/stages/ix_scan.cpp index 08aea2accf3..be0c6686034 100644 --- a/src/mongo/db/exec/sbe/stages/ix_scan.cpp +++ b/src/mongo/db/exec/sbe/stages/ix_scan.cpp @@ -88,11 +88,11 @@ std::unique_ptr<PlanStage> IndexScanStage::clone() const { void IndexScanStage::prepare(CompileCtx& ctx) { if (_recordSlot) { - _recordAccessor = std::make_unique<value::ViewOfValueAccessor>(); + _recordAccessor = std::make_unique<value::OwnedValueAccessor>(); } if (_recordIdSlot) { - _recordIdAccessor = std::make_unique<value::ViewOfValueAccessor>(); + _recordIdAccessor = std::make_unique<value::OwnedValueAccessor>(); } if (_snapshotIdSlot) { @@ -110,7 +110,9 @@ void IndexScanStage::prepare(CompileCtx& ctx) { } if (_seekKeySlotHigh) { _seekKeyHiAccessor = ctx.getAccessor(*_seekKeySlotHigh); + _seekKeyHighHolder = std::make_unique<value::OwnedValueAccessor>(); } + _seekKeyLowHolder = std::make_unique<value::OwnedValueAccessor>(); std::tie(_collName, _catalogEpoch) = acquireCollection(_opCtx, _collUuid, _lockAcquisitionCallback, _coll); @@ -156,6 +158,29 @@ value::SlotAccessor* IndexScanStage::getAccessor(CompileCtx& ctx, value::SlotId } void IndexScanStage::doSaveState() { + if (slotsAccessible()) { + if (_recordAccessor) { + _recordAccessor->makeOwned(); + } + if (_recordIdAccessor) { + _recordIdAccessor->makeOwned(); + } + for (auto& accessor : _accessors) { + accessor.makeOwned(); + } + } + + // Seek points are external to the index scan and must be accessible no matter what as long as + // the index scan is opened. + if (_open) { + if (_seekKeyLowHolder) { + _seekKeyLowHolder->makeOwned(); + } + if (_seekKeyHighHolder) { + _seekKeyHighHolder->makeOwned(); + } + } + if (_cursor) { _cursor->save(); } @@ -252,39 +277,56 @@ void IndexScanStage::open(bool reOpen) { uassert(4822851, str::stream() << "seek key is wrong type: " << msgTagLow, tagLow == value::TypeTags::ksValue); - _seekKeyLow = value::getKeyStringView(valLow); + _seekKeyLowHolder->reset(false, tagLow, valLow); auto [tagHi, valHi] = _seekKeyHiAccessor->getViewOfValue(); const auto msgTagHi = tagHi; uassert(4822852, str::stream() << "seek key is wrong type: " << msgTagHi, tagHi == value::TypeTags::ksValue); - _seekKeyHi = value::getKeyStringView(valHi); + + _seekKeyHighHolder->reset(false, tagHi, valHi); } else if (_seekKeyLowAccessor) { auto [tagLow, valLow] = _seekKeyLowAccessor->getViewOfValue(); const auto msgTagLow = tagLow; uassert(4822853, str::stream() << "seek key is wrong type: " << msgTagLow, tagLow == value::TypeTags::ksValue); - _seekKeyLow = value::getKeyStringView(valLow); - _seekKeyHi = nullptr; + _seekKeyLowHolder->reset(false, tagLow, valLow); } else { auto sdi = entry->accessMethod()->getSortedDataInterface(); KeyString::Builder kb(sdi->getKeyStringVersion(), sdi->getOrdering(), KeyString::Discriminator::kExclusiveBefore); kb.appendDiscriminator(KeyString::Discriminator::kExclusiveBefore); - _startPoint = kb.getValueCopy(); - _seekKeyLow = &_startPoint; - _seekKeyHi = nullptr; + auto [copyTag, copyVal] = value::makeCopyKeyString(kb.getValueCopy()); + _seekKeyLowHolder->reset(true, copyTag, copyVal); } + ++_specificStats.seeks; } +const KeyString::Value& IndexScanStage::getSeekKeyLow() const { + auto [tag, value] = _seekKeyLowHolder->getViewOfValue(); + return *value::getKeyStringView(value); +} + +const KeyString::Value* IndexScanStage::getSeekKeyHigh() const { + if (!_seekKeyHighHolder) { + return nullptr; + } + auto [tag, value] = _seekKeyHighHolder->getViewOfValue(); + return value::getKeyStringView(value); +} + PlanState IndexScanStage::getNext() { auto optTimer(getOptTimer(_opCtx)); + // We are about to get next record from a storage cursor so do not bother saving our internal + // state in case it yields as the state will be completely overwritten after the call. + disableSlotAccess(); + if (!_cursor) { return trackPlanState(PlanState::IS_EOF); } @@ -293,7 +335,7 @@ PlanState IndexScanStage::getNext() { if (_firstGetNext) { _firstGetNext = false; - _nextRecord = _cursor->seekForKeyString(*_seekKeyLow); + _nextRecord = _cursor->seekForKeyString(getSeekKeyLow()); } else { _nextRecord = _cursor->nextKeyString(); } @@ -302,8 +344,8 @@ PlanState IndexScanStage::getNext() { return trackPlanState(PlanState::IS_EOF); } - if (_seekKeyHi) { - auto cmp = _nextRecord->keyString.compare(*_seekKeyHi); + if (auto seekKeyHigh = getSeekKeyHigh(); seekKeyHigh) { + auto cmp = _nextRecord->keyString.compare(*seekKeyHigh); if (_forward) { if (cmp > 0) { @@ -317,12 +359,14 @@ PlanState IndexScanStage::getNext() { } if (_recordAccessor) { - _recordAccessor->reset(value::TypeTags::ksValue, + _recordAccessor->reset(false, + value::TypeTags::ksValue, value::bitcastFrom<KeyString::Value*>(&_nextRecord->keyString)); } if (_recordIdAccessor) { - _recordIdAccessor->reset(value::TypeTags::RecordId, + _recordIdAccessor->reset(false, + value::TypeTags::RecordId, value::bitcastFrom<int64_t>(_nextRecord->loc.getLong())); } @@ -348,7 +392,7 @@ PlanState IndexScanStage::getNext() { void IndexScanStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _cursor.reset(); _coll.reset(); diff --git a/src/mongo/db/exec/sbe/stages/ix_scan.h b/src/mongo/db/exec/sbe/stages/ix_scan.h index 35032a1c0cd..00e3dd16bfb 100644 --- a/src/mongo/db/exec/sbe/stages/ix_scan.h +++ b/src/mongo/db/exec/sbe/stages/ix_scan.h @@ -102,6 +102,9 @@ private: */ void restoreCollectionAndIndex(); + const KeyString::Value& getSeekKeyLow() const; + const KeyString::Value* getSeekKeyHigh() const; + const CollectionUUID _collUuid; const std::string _indexName; const bool _forward; @@ -118,21 +121,20 @@ private: LockAcquisitionCallback _lockAcquisitionCallback; - std::unique_ptr<value::ViewOfValueAccessor> _recordAccessor; - std::unique_ptr<value::ViewOfValueAccessor> _recordIdAccessor; + std::unique_ptr<value::OwnedValueAccessor> _recordAccessor; + std::unique_ptr<value::OwnedValueAccessor> _recordIdAccessor; std::unique_ptr<value::OwnedValueAccessor> _snapshotIdAccessor; // One accessor and slot for each key component that this stage will bind from an index entry's // KeyString. The accessors are in the same order as the key components they bind to. - std::vector<value::ViewOfValueAccessor> _accessors; + std::vector<value::OwnedValueAccessor> _accessors; value::SlotAccessorMap _accessorMap; value::SlotAccessor* _seekKeyLowAccessor{nullptr}; value::SlotAccessor* _seekKeyHiAccessor{nullptr}; - KeyString::Value _startPoint; - KeyString::Value* _seekKeyLow{nullptr}; - KeyString::Value* _seekKeyHi{nullptr}; + std::unique_ptr<value::OwnedValueAccessor> _seekKeyLowHolder; + std::unique_ptr<value::OwnedValueAccessor> _seekKeyHighHolder; std::unique_ptr<SortedDataInterface::Cursor> _cursor; std::weak_ptr<const IndexCatalogEntry> _weakIndexCatalogEntry; diff --git a/src/mongo/db/exec/sbe/stages/limit_skip.cpp b/src/mongo/db/exec/sbe/stages/limit_skip.cpp index b09e3e17a9b..7e2206b8023 100644 --- a/src/mongo/db/exec/sbe/stages/limit_skip.cpp +++ b/src/mongo/db/exec/sbe/stages/limit_skip.cpp @@ -86,7 +86,7 @@ PlanState LimitSkipStage::getNext() { void LimitSkipStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _children[0]->close(); } diff --git a/src/mongo/db/exec/sbe/stages/loop_join.cpp b/src/mongo/db/exec/sbe/stages/loop_join.cpp index 98f0f3e0085..290d8523572 100644 --- a/src/mongo/db/exec/sbe/stages/loop_join.cpp +++ b/src/mongo/db/exec/sbe/stages/loop_join.cpp @@ -149,7 +149,7 @@ PlanState LoopJoinStage::getNext() { void LoopJoinStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); if (_reOpenInner) { _children[1]->close(); diff --git a/src/mongo/db/exec/sbe/stages/makeobj.cpp b/src/mongo/db/exec/sbe/stages/makeobj.cpp index 2c46aa48f80..ce571bd47d7 100644 --- a/src/mongo/db/exec/sbe/stages/makeobj.cpp +++ b/src/mongo/db/exec/sbe/stages/makeobj.cpp @@ -359,6 +359,9 @@ template <MakeObjOutputType O> PlanState MakeObjStageBase<O>::getNext() { auto optTimer(getOptTimer(_opCtx)); + // We are about to call getNext() on our child so do not bother saving our internal state in + // case it yields as the state will be completely overwritten after the getNext() call. + disableSlotAccess(); auto state = _children[0]->getNext(); if (state == PlanState::ADVANCED) { @@ -371,7 +374,7 @@ template <MakeObjOutputType O> void MakeObjStageBase<O>::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _children[0]->close(); } @@ -448,6 +451,15 @@ std::vector<DebugPrinter::Block> MakeObjStageBase<O>::debugPrint() const { return ret; } +template <MakeObjOutputType O> +void MakeObjStageBase<O>::doSaveState() { + if (!slotsAccessible()) { + return; + } + + _obj.makeOwned(); +} + // Explicit template instantiations. template class MakeObjStageBase<MakeObjOutputType::object>; template class MakeObjStageBase<MakeObjOutputType::bsonObject>; diff --git a/src/mongo/db/exec/sbe/stages/makeobj.h b/src/mongo/db/exec/sbe/stages/makeobj.h index 10dd950bcf6..9901ad17744 100644 --- a/src/mongo/db/exec/sbe/stages/makeobj.h +++ b/src/mongo/db/exec/sbe/stages/makeobj.h @@ -91,6 +91,9 @@ public: const SpecificStats* getSpecificStats() const final; std::vector<DebugPrinter::Block> debugPrint() const final; +protected: + void doSaveState() final; + private: void projectField(value::Object* obj, size_t idx); void projectField(UniqueBSONObjBuilder* bob, size_t idx); diff --git a/src/mongo/db/exec/sbe/stages/merge_join.cpp b/src/mongo/db/exec/sbe/stages/merge_join.cpp index ad9d1705903..191d5efe3aa 100644 --- a/src/mongo/db/exec/sbe/stages/merge_join.cpp +++ b/src/mongo/db/exec/sbe/stages/merge_join.cpp @@ -45,7 +45,8 @@ value::MaterializedRow materializeCopyOfRow(std::vector<value::SlotAccessor*>& a size_t idx = 0; for (auto& accessor : accessors) { - auto [tag, val] = accessor->copyOrMoveValue(); + auto [tag, val] = accessor->getViewOfValue(); + std::tie(tag, val) = value::copyValue(tag, val); row.reset(idx++, true, tag, val); } return row; @@ -314,12 +315,22 @@ PlanState MergeJoinStage::getNext() { void MergeJoinStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _children[0]->close(); _children[1]->close(); _outerProjectsBuffer.clear(); } +void MergeJoinStage::doSaveState() { + if (!slotsAccessible()) { + return; + } + + // We only have to save shallow non-owning materialized rows. + _currentOuterKey.makeOwned(); + _currentInnerKey.makeOwned(); +} + std::unique_ptr<PlanStageStats> MergeJoinStage::getStats(bool includeDebugInfo) const { auto ret = std::make_unique<PlanStageStats>(_commonStats); diff --git a/src/mongo/db/exec/sbe/stages/merge_join.h b/src/mongo/db/exec/sbe/stages/merge_join.h index 34f158f220c..fefac37aeb6 100644 --- a/src/mongo/db/exec/sbe/stages/merge_join.h +++ b/src/mongo/db/exec/sbe/stages/merge_join.h @@ -65,6 +65,9 @@ public: const SpecificStats* getSpecificStats() const final; std::vector<DebugPrinter::Block> debugPrint() const final; +protected: + void doSaveState() final; + private: using MergeJoinBuffer = std::vector<value::MaterializedRow>; using MergeJoinBufferAccessor = value::MaterializedRowAccessor<MergeJoinBuffer>; diff --git a/src/mongo/db/exec/sbe/stages/project.cpp b/src/mongo/db/exec/sbe/stages/project.cpp index 3520ca4c603..d2a2ffb5fe1 100644 --- a/src/mongo/db/exec/sbe/stages/project.cpp +++ b/src/mongo/db/exec/sbe/stages/project.cpp @@ -78,6 +78,9 @@ void ProjectStage::open(bool reOpen) { PlanState ProjectStage::getNext() { auto optTimer(getOptTimer(_opCtx)); + // We are about to call getNext() on our child so do not bother saving our internal state in + // case it yields as the state will be completely overwritten after the getNext() call. + disableSlotAccess(); auto state = _children[0]->getNext(); if (state == PlanState::ADVANCED) { @@ -96,7 +99,7 @@ PlanState ProjectStage::getNext() { void ProjectStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _children[0]->close(); } @@ -141,5 +144,17 @@ std::vector<DebugPrinter::Block> ProjectStage::debugPrint() const { DebugPrinter::addBlocks(ret, _children[0]->debugPrint()); return ret; } + +void ProjectStage::doSaveState() { + if (!slotsAccessible()) { + return; + } + + for (auto& [slotId, codeAndAccessor] : _fields) { + auto& [code, accessor] = codeAndAccessor; + accessor.makeOwned(); + } +} + } // namespace sbe } // namespace mongo diff --git a/src/mongo/db/exec/sbe/stages/project.h b/src/mongo/db/exec/sbe/stages/project.h index 08b8e6e2f89..ae13653a8a1 100644 --- a/src/mongo/db/exec/sbe/stages/project.h +++ b/src/mongo/db/exec/sbe/stages/project.h @@ -52,6 +52,9 @@ public: const SpecificStats* getSpecificStats() const final; std::vector<DebugPrinter::Block> debugPrint() const final; +protected: + void doSaveState() final; + private: const value::SlotMap<std::unique_ptr<EExpression>> _projects; value::SlotMap<std::pair<std::unique_ptr<vm::CodeFragment>, value::OwnedValueAccessor>> _fields; diff --git a/src/mongo/db/exec/sbe/stages/scan.cpp b/src/mongo/db/exec/sbe/stages/scan.cpp index 1858b7fd8e6..791fe61e5f8 100644 --- a/src/mongo/db/exec/sbe/stages/scan.cpp +++ b/src/mongo/db/exec/sbe/stages/scan.cpp @@ -97,16 +97,16 @@ std::unique_ptr<PlanStage> ScanStage::clone() const { void ScanStage::prepare(CompileCtx& ctx) { if (_recordSlot) { - _recordAccessor = std::make_unique<value::ViewOfValueAccessor>(); + _recordAccessor = std::make_unique<value::OwnedValueAccessor>(); } if (_recordIdSlot) { - _recordIdAccessor = std::make_unique<value::ViewOfValueAccessor>(); + _recordIdAccessor = std::make_unique<value::OwnedValueAccessor>(); } for (size_t idx = 0; idx < _fields.size(); ++idx) { auto [it, inserted] = - _fieldAccessors.emplace(_fields[idx], std::make_unique<value::ViewOfValueAccessor>()); + _fieldAccessors.emplace(_fields[idx], std::make_unique<value::OwnedValueAccessor>()); uassert(4822814, str::stream() << "duplicate field: " << _fields[idx], inserted); auto [itRename, insertedRename] = _varAccessors.emplace(_vars[idx], it->second.get()); uassert(4822815, str::stream() << "duplicate field: " << _vars[idx], insertedRename); @@ -161,6 +161,18 @@ value::SlotAccessor* ScanStage::getAccessor(CompileCtx& ctx, value::SlotId slot) } void ScanStage::doSaveState() { + if (slotsAccessible()) { + if (_recordAccessor) { + _recordAccessor->makeOwned(); + } + if (_recordIdAccessor) { + _recordIdAccessor->makeOwned(); + } + for (auto& [fieldName, accessor] : _fieldAccessors) { + accessor->makeOwned(); + } + } + if (_cursor) { _cursor->save(); } @@ -263,6 +275,10 @@ void ScanStage::open(bool reOpen) { PlanState ScanStage::getNext() { auto optTimer(getOptTimer(_opCtx)); + // We are about to call next() on a storage cursor so do not bother saving our internal state in + // case it yields as the state will be completely overwritten after the next() call. + disableSlotAccess(); + if (!_cursor) { return trackPlanState(PlanState::IS_EOF); } @@ -299,12 +315,14 @@ PlanState ScanStage::getNext() { } if (_recordAccessor) { - _recordAccessor->reset(value::TypeTags::bsonObject, + _recordAccessor->reset(false, + value::TypeTags::bsonObject, value::bitcastFrom<const char*>(nextRecord->data.data())); } if (_recordIdAccessor) { - _recordIdAccessor->reset(value::TypeTags::RecordId, + _recordIdAccessor->reset(false, + value::TypeTags::RecordId, value::bitcastFrom<int64_t>(nextRecord->id.getLong())); } @@ -327,7 +345,7 @@ PlanState ScanStage::getNext() { _oplogTsAccessor->reset(false, ownedTag, ownedVal); } - it->second->reset(tag, val); + it->second->reset(false, tag, val); if ((--fieldsToMatch) == 0) { // No need to scan any further so bail out early. @@ -355,7 +373,7 @@ PlanState ScanStage::getNext() { void ScanStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _cursor.reset(); _coll.reset(); _open = false; @@ -541,16 +559,16 @@ std::unique_ptr<PlanStage> ParallelScanStage::clone() const { void ParallelScanStage::prepare(CompileCtx& ctx) { if (_recordSlot) { - _recordAccessor = std::make_unique<value::ViewOfValueAccessor>(); + _recordAccessor = std::make_unique<value::OwnedValueAccessor>(); } if (_recordIdSlot) { - _recordIdAccessor = std::make_unique<value::ViewOfValueAccessor>(); + _recordIdAccessor = std::make_unique<value::OwnedValueAccessor>(); } for (size_t idx = 0; idx < _fields.size(); ++idx) { auto [it, inserted] = - _fieldAccessors.emplace(_fields[idx], std::make_unique<value::ViewOfValueAccessor>()); + _fieldAccessors.emplace(_fields[idx], std::make_unique<value::OwnedValueAccessor>()); uassert(4822816, str::stream() << "duplicate field: " << _fields[idx], inserted); auto [itRename, insertedRename] = _varAccessors.emplace(_vars[idx], it->second.get()); uassert(4822817, str::stream() << "duplicate field: " << _vars[idx], insertedRename); @@ -592,6 +610,18 @@ value::SlotAccessor* ParallelScanStage::getAccessor(CompileCtx& ctx, value::Slot } void ParallelScanStage::doSaveState() { + if (slotsAccessible()) { + if (_recordAccessor) { + _recordAccessor->makeOwned(); + } + if (_recordIdAccessor) { + _recordIdAccessor->makeOwned(); + } + for (auto& [fieldName, accessor] : _fieldAccessors) { + accessor->makeOwned(); + } + } + if (_cursor) { _cursor->save(); } @@ -697,6 +727,10 @@ boost::optional<Record> ParallelScanStage::nextRange() { PlanState ParallelScanStage::getNext() { auto optTimer(getOptTimer(_opCtx)); + // We are about to call next() on a storage cursor so do not bother saving our internal state in + // case it yields as the state will be completely overwritten after the next() call. + disableSlotAccess(); + if (!_cursor) { return trackPlanState(PlanState::IS_EOF); } @@ -740,12 +774,14 @@ PlanState ParallelScanStage::getNext() { } while (!nextRecord); if (_recordAccessor) { - _recordAccessor->reset(value::TypeTags::bsonObject, + _recordAccessor->reset(false, + value::TypeTags::bsonObject, value::bitcastFrom<const char*>(nextRecord->data.data())); } if (_recordIdAccessor) { - _recordIdAccessor->reset(value::TypeTags::RecordId, + _recordIdAccessor->reset(false, + value::TypeTags::RecordId, value::bitcastFrom<int64_t>(nextRecord->id.getLong())); } @@ -764,7 +800,7 @@ PlanState ParallelScanStage::getNext() { // Found the field so convert it to Value. auto [tag, val] = bson::convertFrom(true, be, end, sv.size()); - it->second->reset(tag, val); + it->second->reset(false, tag, val); if ((--fieldsToMatch) == 0) { // No need to scan any further so bail out early. @@ -782,6 +818,7 @@ PlanState ParallelScanStage::getNext() { void ParallelScanStage::close() { auto optTimer(getOptTimer(_opCtx)); + trackClose(); _cursor.reset(); _coll.reset(); _open = false; diff --git a/src/mongo/db/exec/sbe/stages/scan.h b/src/mongo/db/exec/sbe/stages/scan.h index f54e8525fff..dca4da1c496 100644 --- a/src/mongo/db/exec/sbe/stages/scan.h +++ b/src/mongo/db/exec/sbe/stages/scan.h @@ -118,8 +118,8 @@ private: const ScanCallbacks _scanCallbacks; - std::unique_ptr<value::ViewOfValueAccessor> _recordAccessor; - std::unique_ptr<value::ViewOfValueAccessor> _recordIdAccessor; + std::unique_ptr<value::OwnedValueAccessor> _recordAccessor; + std::unique_ptr<value::OwnedValueAccessor> _recordIdAccessor; value::SlotAccessor* _snapshotIdAccessor{nullptr}; value::SlotAccessor* _indexIdAccessor{nullptr}; value::SlotAccessor* _indexKeyAccessor{nullptr}; @@ -223,8 +223,8 @@ private: const ScanCallbacks _scanCallbacks; - std::unique_ptr<value::ViewOfValueAccessor> _recordAccessor; - std::unique_ptr<value::ViewOfValueAccessor> _recordIdAccessor; + std::unique_ptr<value::OwnedValueAccessor> _recordAccessor; + std::unique_ptr<value::OwnedValueAccessor> _recordIdAccessor; value::SlotAccessor* _snapshotIdAccessor{nullptr}; value::SlotAccessor* _indexIdAccessor{nullptr}; value::SlotAccessor* _indexKeyAccessor{nullptr}; diff --git a/src/mongo/db/exec/sbe/stages/sort.cpp b/src/mongo/db/exec/sbe/stages/sort.cpp index 647f0dd346a..b1ba46b8014 100644 --- a/src/mongo/db/exec/sbe/stages/sort.cpp +++ b/src/mongo/db/exec/sbe/stages/sort.cpp @@ -225,7 +225,7 @@ PlanState SortStage::getNext() { void SortStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _mergeIt.reset(); _sorter.reset(); } diff --git a/src/mongo/db/exec/sbe/stages/sorted_merge.cpp b/src/mongo/db/exec/sbe/stages/sorted_merge.cpp index c978c6136de..f44d3e436fa 100644 --- a/src/mongo/db/exec/sbe/stages/sorted_merge.cpp +++ b/src/mongo/db/exec/sbe/stages/sorted_merge.cpp @@ -130,7 +130,7 @@ PlanState SortedMergeStage::getNext() { } void SortedMergeStage::close() { - ++_commonStats.closes; + trackClose(); for (auto& child : _children) { child->close(); } diff --git a/src/mongo/db/exec/sbe/stages/spool.cpp b/src/mongo/db/exec/sbe/stages/spool.cpp index f76b913358e..9e1f66e71f9 100644 --- a/src/mongo/db/exec/sbe/stages/spool.cpp +++ b/src/mongo/db/exec/sbe/stages/spool.cpp @@ -117,7 +117,7 @@ PlanState SpoolEagerProducerStage::getNext() { void SpoolEagerProducerStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _buffer->clear(); } @@ -196,7 +196,7 @@ void SpoolLazyProducerStage::prepare(CompileCtx& ctx) { uassert(4822811, str::stream() << "duplicate field: " << slot, inserted); _inAccessors.emplace_back(_children[0]->getAccessor(ctx, slot)); - _outAccessors.emplace(slot, value::ViewOfValueAccessor{}); + _outAccessors.emplace(slot, value::OwnedValueAccessor{}); } _compiled = true; @@ -228,6 +228,9 @@ void SpoolLazyProducerStage::open(bool reOpen) { PlanState SpoolLazyProducerStage::getNext() { auto optTimer(getOptTimer(_opCtx)); + // We are about to call getNext() on our child so do not bother saving our internal state in + // case it yields as the state will be completely overwritten after the getNext() call. + disableSlotAccess(); auto state = _children[0]->getNext(); if (state == PlanState::ADVANCED) { @@ -244,7 +247,7 @@ PlanState SpoolLazyProducerStage::getNext() { for (size_t idx = 0; idx < _inAccessors.size(); ++idx) { auto [tag, val] = _inAccessors[idx]->getViewOfValue(); - _outAccessors[_vals[idx]].reset(tag, val); + _outAccessors[_vals[idx]].reset(false, tag, val); auto [copyTag, copyVal] = value::copyValue(tag, val); vals.reset(idx, true, copyTag, copyVal); @@ -255,7 +258,7 @@ PlanState SpoolLazyProducerStage::getNext() { // Otherwise, just pass through the input values. for (size_t idx = 0; idx < _inAccessors.size(); ++idx) { auto [tag, val] = _inAccessors[idx]->getViewOfValue(); - _outAccessors[_vals[idx]].reset(tag, val); + _outAccessors[_vals[idx]].reset(false, tag, val); } } } @@ -263,9 +266,20 @@ PlanState SpoolLazyProducerStage::getNext() { return trackPlanState(state); } +void SpoolLazyProducerStage::doSaveState() { + if (!slotsAccessible()) { + return; + } + + for (auto& [slot, accessor] : _outAccessors) { + accessor.makeOwned(); + } +} + void SpoolLazyProducerStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + + trackClose(); _buffer->clear(); _children[0]->close(); diff --git a/src/mongo/db/exec/sbe/stages/spool.h b/src/mongo/db/exec/sbe/stages/spool.h index d6eabed2026..666ef4cb976 100644 --- a/src/mongo/db/exec/sbe/stages/spool.h +++ b/src/mongo/db/exec/sbe/stages/spool.h @@ -113,13 +113,16 @@ public: const SpecificStats* getSpecificStats() const final; std::vector<DebugPrinter::Block> debugPrint() const final; +protected: + void doSaveState() final; + private: std::shared_ptr<SpoolBuffer> _buffer{nullptr}; const SpoolId _spoolId; const value::SlotVector _vals; std::vector<value::SlotAccessor*> _inAccessors; - value::SlotMap<value::ViewOfValueAccessor> _outAccessors; + value::SlotMap<value::OwnedValueAccessor> _outAccessors; std::unique_ptr<EExpression> _predicate; std::unique_ptr<vm::CodeFragment> _predicateCode; @@ -217,7 +220,7 @@ public: void close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); } std::unique_ptr<PlanStageStats> getStats(bool includeDebugInfo) const { diff --git a/src/mongo/db/exec/sbe/stages/stages.h b/src/mongo/db/exec/sbe/stages/stages.h index 034eab605a3..f545fb21361 100644 --- a/src/mongo/db/exec/sbe/stages/stages.h +++ b/src/mongo/db/exec/sbe/stages/stages.h @@ -128,11 +128,14 @@ public: void saveState() { auto stage = static_cast<T*>(this); stage->_commonStats.yields++; - for (auto&& child : stage->_children) { - child->saveState(); - } stage->doSaveState(); + // Save the children in a right to left order so dependent stages (i.e. one using correlated + // slots) are saved first. + auto& children = stage->_children; + for (auto it = children.rbegin(); it != children.rend(); it++) { + (*it)->saveState(); + } } /** @@ -251,13 +254,29 @@ protected: PlanState trackPlanState(PlanState state) { if (state == PlanState::IS_EOF) { _commonStats.isEOF = true; + _slotsAccessible = false; } else { invariant(state == PlanState::ADVANCED); _commonStats.advances++; + _slotsAccessible = true; } return state; } + + void trackClose() { + _commonStats.closes++; + _slotsAccessible = false; + } + + bool slotsAccessible() const { + return _slotsAccessible; + } + + void disableSlotAccess() { + _slotsAccessible = false; + } + /** * Returns an optional timer which is used to collect time spent executing the current stage. * May return boost::none if it is not necessary to collect timing info. @@ -272,6 +291,15 @@ protected: } CommonStats _commonStats; + +private: + /** + * In general, accessors can be accesed only after getNext returns a row. It is most definitely + * not OK to access accessors in ANY other state; e.g. closed, not yet opened, after EOF. We + * need this tracker to support unfortunate consequences of the internal yielding feature. Once + * that feature is retired we can then simply revisit all stages and simplify them. + */ + bool _slotsAccessible{false}; }; /** diff --git a/src/mongo/db/exec/sbe/stages/traverse.cpp b/src/mongo/db/exec/sbe/stages/traverse.cpp index 7f78a4c01a0..a3ad6074ce3 100644 --- a/src/mongo/db/exec/sbe/stages/traverse.cpp +++ b/src/mongo/db/exec/sbe/stages/traverse.cpp @@ -163,8 +163,8 @@ bool TraverseStage::traverse(value::SlotAccessor* inFieldAccessor, if (value::isArray(tag)) { // If it is an array then we have to traverse it. - value::ArrayAccessor inArrayAccessor; - inArrayAccessor.reset(tag, val); + auto& inArrayAccessor = _inArrayAccessors.emplace_back(value::ArrayAccessor{}); + inArrayAccessor.reset(inFieldAccessor); value::Array* arrOut{nullptr}; if (!_foldCode) { @@ -236,14 +236,15 @@ bool TraverseStage::traverse(value::SlotAccessor* inFieldAccessor, } } } + + _inArrayAccessors.pop_back(); } else { // For non-arrays we simply execute the inner side once. + outFieldOutputAccessor->reset(); openInner(tag, val); auto state = _children[1]->getNext(); - if (state == PlanState::IS_EOF) { - outFieldOutputAccessor->reset(); - } else { + if (state == PlanState::ADVANCED) { auto [tag, val] = _outFieldInputAccessor->getViewOfValue(); // We don't have to copy the value. outFieldOutputAccessor->reset(false, tag, val); @@ -256,7 +257,7 @@ bool TraverseStage::traverse(value::SlotAccessor* inFieldAccessor, void TraverseStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); if (_reOpenInner) { _children[1]->close(); @@ -267,6 +268,24 @@ void TraverseStage::close() { _children[0]->close(); } +void TraverseStage::doSaveState() { + if (!slotsAccessible()) { + return; + } + + _outFieldOutputAccessor.makeOwned(); +} + +void TraverseStage::doRestoreState() { + if (!slotsAccessible()) { + return; + } + + for (auto& accessor : _inArrayAccessors) { + accessor.refresh(); + } +} + std::unique_ptr<PlanStageStats> TraverseStage::getStats(bool includeDebugInfo) const { auto ret = std::make_unique<PlanStageStats>(_commonStats); ret->specific = std::make_unique<TraverseStats>(_specificStats); diff --git a/src/mongo/db/exec/sbe/stages/traverse.h b/src/mongo/db/exec/sbe/stages/traverse.h index b2dbe881372..ee82e75d6b4 100644 --- a/src/mongo/db/exec/sbe/stages/traverse.h +++ b/src/mongo/db/exec/sbe/stages/traverse.h @@ -70,6 +70,10 @@ public: const SpecificStats* getSpecificStats() const final; std::vector<DebugPrinter::Block> debugPrint() const final; +protected: + void doSaveState() final; + void doRestoreState() final; + private: void openInner(value::TypeTags tag, value::Value val); bool traverse(value::SlotAccessor* inFieldAccessor, @@ -105,6 +109,8 @@ private: std::unique_ptr<vm::CodeFragment> _foldCode; std::unique_ptr<vm::CodeFragment> _finalCode; + std::list<value::ArrayAccessor> _inArrayAccessors; + vm::ByteCode _bytecode; bool _compiled{false}; diff --git a/src/mongo/db/exec/sbe/stages/union.cpp b/src/mongo/db/exec/sbe/stages/union.cpp index 9956b1c91f2..1752a3a90a9 100644 --- a/src/mongo/db/exec/sbe/stages/union.cpp +++ b/src/mongo/db/exec/sbe/stages/union.cpp @@ -150,7 +150,7 @@ PlanState UnionStage::getNext() { void UnionStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _currentStage = nullptr; while (!_remainingBranchesToDrain.empty()) { _remainingBranchesToDrain.front().close(); diff --git a/src/mongo/db/exec/sbe/stages/unique.cpp b/src/mongo/db/exec/sbe/stages/unique.cpp index 631a9976e3f..38081b3ff87 100644 --- a/src/mongo/db/exec/sbe/stages/unique.cpp +++ b/src/mongo/db/exec/sbe/stages/unique.cpp @@ -89,7 +89,7 @@ PlanState UniqueStage::getNext() { void UniqueStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _seen.clear(); _children[0]->close(); diff --git a/src/mongo/db/exec/sbe/stages/unwind.cpp b/src/mongo/db/exec/sbe/stages/unwind.cpp index 63f3af745d1..6bcf87bd176 100644 --- a/src/mongo/db/exec/sbe/stages/unwind.cpp +++ b/src/mongo/db/exec/sbe/stages/unwind.cpp @@ -68,10 +68,10 @@ void UnwindStage::prepare(CompileCtx& ctx) { _inFieldAccessor = _children[0]->getAccessor(ctx, _inField); // Prepare the outField output accessor. - _outFieldOutputAccessor = std::make_unique<value::ViewOfValueAccessor>(); + _outFieldOutputAccessor = std::make_unique<value::OwnedValueAccessor>(); // Prepare the outIndex output accessor. - _outIndexOutputAccessor = std::make_unique<value::ViewOfValueAccessor>(); + _outIndexOutputAccessor = std::make_unique<value::OwnedValueAccessor>(); } value::SlotAccessor* UnwindStage::getAccessor(CompileCtx& ctx, value::SlotId slot) { @@ -101,6 +101,10 @@ PlanState UnwindStage::getNext() { if (!_inArray) { do { + // We are about to call getNext() on our child so do not bother saving our internal + // state in case it yields as the state will be completely overwritten after the + // getNext() call. + disableSlotAccess(); auto state = _children[0]->getNext(); if (state != PlanState::ADVANCED) { return trackPlanState(state); @@ -110,7 +114,7 @@ PlanState UnwindStage::getNext() { auto [tag, val] = _inFieldAccessor->getViewOfValue(); if (value::isArray(tag)) { - _inArrayAccessor.reset(tag, val); + _inArrayAccessor.reset(_inFieldAccessor); _index = 0; _inArray = true; @@ -118,8 +122,9 @@ PlanState UnwindStage::getNext() { if (_inArrayAccessor.atEnd()) { _inArray = false; if (_preserveNullAndEmptyArrays) { - _outFieldOutputAccessor->reset(value::TypeTags::Nothing, 0); - _outIndexOutputAccessor->reset(value::TypeTags::NumberInt64, + _outFieldOutputAccessor->reset(false, value::TypeTags::Nothing, 0); + _outIndexOutputAccessor->reset(false, + value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(_index)); return trackPlanState(PlanState::ADVANCED); } @@ -129,8 +134,8 @@ PlanState UnwindStage::getNext() { tag == value::TypeTags::Null || tag == value::TypeTags::Nothing; if (!nullOrNothing || _preserveNullAndEmptyArrays) { - _outFieldOutputAccessor->reset(tag, val); - _outIndexOutputAccessor->reset(value::TypeTags::Nothing, 0); + _outFieldOutputAccessor->reset(false, tag, val); + _outIndexOutputAccessor->reset(false, value::TypeTags::Nothing, 0); return trackPlanState(PlanState::ADVANCED); } } @@ -140,9 +145,9 @@ PlanState UnwindStage::getNext() { // We are inside the array so pull out the current element and advance. auto [tagElem, valElem] = _inArrayAccessor.getViewOfValue(); - _outFieldOutputAccessor->reset(tagElem, valElem); - _outIndexOutputAccessor->reset(value::TypeTags::NumberInt64, - value::bitcastFrom<int64_t>(_index)); + _outFieldOutputAccessor->reset(false, tagElem, valElem); + _outIndexOutputAccessor->reset( + false, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(_index)); _inArrayAccessor.advance(); ++_index; @@ -157,7 +162,7 @@ PlanState UnwindStage::getNext() { void UnwindStage::close() { auto optTimer(getOptTimer(_opCtx)); - _commonStats.closes++; + trackClose(); _children[0]->close(); } @@ -194,4 +199,25 @@ std::vector<DebugPrinter::Block> UnwindStage::debugPrint() const { return ret; } + +void UnwindStage::doSaveState() { + if (!slotsAccessible()) { + return; + } + + if (_outFieldOutputAccessor) { + _outFieldOutputAccessor->makeOwned(); + } + if (_outIndexOutputAccessor) { + _outIndexOutputAccessor->makeOwned(); + } +} + +void UnwindStage::doRestoreState() { + if (!slotsAccessible()) { + return; + } + + _inArrayAccessor.refresh(); +} } // namespace mongo::sbe diff --git a/src/mongo/db/exec/sbe/stages/unwind.h b/src/mongo/db/exec/sbe/stages/unwind.h index 17076ca881d..cf95a7d1d6b 100644 --- a/src/mongo/db/exec/sbe/stages/unwind.h +++ b/src/mongo/db/exec/sbe/stages/unwind.h @@ -53,6 +53,10 @@ public: const SpecificStats* getSpecificStats() const final; std::vector<DebugPrinter::Block> debugPrint() const final; +protected: + void doSaveState() final; + void doRestoreState() final; + private: const value::SlotId _inField; const value::SlotId _outField; @@ -60,8 +64,8 @@ private: const bool _preserveNullAndEmptyArrays; value::SlotAccessor* _inFieldAccessor{nullptr}; - std::unique_ptr<value::ViewOfValueAccessor> _outFieldOutputAccessor; - std::unique_ptr<value::ViewOfValueAccessor> _outIndexOutputAccessor; + std::unique_ptr<value::OwnedValueAccessor> _outFieldOutputAccessor; + std::unique_ptr<value::OwnedValueAccessor> _outIndexOutputAccessor; value::ArrayAccessor _inArrayAccessor; diff --git a/src/mongo/db/exec/sbe/values/slot.h b/src/mongo/db/exec/sbe/values/slot.h index 2ba876da923..6c249e37b36 100644 --- a/src/mongo/db/exec/sbe/values/slot.h +++ b/src/mongo/db/exec/sbe/values/slot.h @@ -186,6 +186,15 @@ public: _owned = owned; } + void makeOwned() { + if (_owned) { + return; + } + + std::tie(_tag, _val) = copyValue(_tag, _val); + _owned = true; + } + private: void release() { if (_owned) { @@ -208,8 +217,10 @@ private: */ class ArrayAccessor final : public SlotAccessor { public: - void reset(TypeTags tag, Value val) { - _enumerator.reset(tag, val); + void reset(SlotAccessor* input) { + _input = input; + _currentIndex = 0; + refresh(); } // Return non-owning view of the value. @@ -227,10 +238,18 @@ public: } bool advance() { + ++_currentIndex; return _enumerator.advance(); } + void refresh() { + auto [tag, val] = _input->getViewOfValue(); + _enumerator.reset(tag, val, _currentIndex); + } + private: + size_t _currentIndex = 0; + SlotAccessor* _input{nullptr}; ArrayEnumerator _enumerator; }; @@ -628,7 +647,7 @@ void readKeyStringValueIntoAccessors( const KeyString::Value& keyString, const Ordering& ordering, BufBuilder* valueBufferBuilder, - std::vector<ViewOfValueAccessor>* accessors, + std::vector<OwnedValueAccessor>* accessors, boost::optional<IndexKeysInclusionSet> indexKeysToInclude = boost::none); @@ -638,7 +657,8 @@ void readKeyStringValueIntoAccessors( template <typename T> using SlotMap = absl::flat_hash_map<SlotId, T>; using SlotAccessorMap = SlotMap<SlotAccessor*>; -using FieldAccessorMap = StringMap<std::unique_ptr<ViewOfValueAccessor>>; +using FieldAccessorMap = StringMap<std::unique_ptr<OwnedValueAccessor>>; +using FieldViewAccessorMap = StringMap<std::unique_ptr<ViewOfValueAccessor>>; using SlotSet = absl::flat_hash_set<SlotId>; using SlotVector = std::vector<SlotId>; diff --git a/src/mongo/db/exec/sbe/values/value.cpp b/src/mongo/db/exec/sbe/values/value.cpp index bb7790f8da5..48a18a327c6 100644 --- a/src/mongo/db/exec/sbe/values/value.cpp +++ b/src/mongo/db/exec/sbe/values/value.cpp @@ -1149,7 +1149,7 @@ StringData ObjectEnumerator::getFieldName() const { void readKeyStringValueIntoAccessors(const KeyString::Value& keyString, const Ordering& ordering, BufBuilder* valueBufferBuilder, - std::vector<ViewOfValueAccessor>* accessors, + std::vector<OwnedValueAccessor>* accessors, boost::optional<IndexKeysInclusionSet> indexKeysToInclude) { ValueBuilder valBuilder(valueBufferBuilder); invariant(!indexKeysToInclude || indexKeysToInclude->count() == accessors->size()); diff --git a/src/mongo/db/exec/sbe/values/value.h b/src/mongo/db/exec/sbe/values/value.h index 0de10a27861..41e3b6e0890 100644 --- a/src/mongo/db/exec/sbe/values/value.h +++ b/src/mongo/db/exec/sbe/values/value.h @@ -1219,7 +1219,7 @@ public: reset(tag, val); } - void reset(TypeTags tag, Value val) { + void reset(TypeTags tag, Value val, size_t index = 0) { _tagArray = tag; _valArray = val; _array = nullptr; @@ -1228,15 +1228,22 @@ public: if (tag == TypeTags::Array) { _array = getArrayView(val); - } else if (tag == TypeTags::ArraySet) { - _arraySet = getArraySetView(val); - _iter = _arraySet->values().begin(); - } else if (tag == TypeTags::bsonArray) { - auto bson = getRawPointerView(val); - _arrayCurrent = bson + 4; - _arrayEnd = bson + ConstDataView(bson).read<LittleEndian<uint32_t>>(); + _index = index; } else { - MONGO_UNREACHABLE; + if (tag == TypeTags::ArraySet) { + _arraySet = getArraySetView(val); + _iter = _arraySet->values().begin(); + } else if (tag == TypeTags::bsonArray) { + auto bson = getRawPointerView(val); + _arrayCurrent = bson + 4; + _arrayEnd = bson + ConstDataView(bson).read<LittleEndian<uint32_t>>(); + } else { + MONGO_UNREACHABLE; + } + + for (size_t i = 0; !atEnd() && i < index; i++) { + advance(); + } } } diff --git a/src/mongo/db/exec/sbe/values/value_builder.h b/src/mongo/db/exec/sbe/values/value_builder.h index 56a533db989..cee35b3f4e9 100644 --- a/src/mongo/db/exec/sbe/values/value_builder.h +++ b/src/mongo/db/exec/sbe/values/value_builder.h @@ -42,7 +42,7 @@ namespace mongo::sbe::value { * sbe::value::Value. During construction, these pairs are stored in the parallel '_tagList' and * '_valList' arrays, as a "structure of arrays." * - * After constructing the array, use the 'readValues()' method to populate a ViewOfValueAccessor + * After constructing the array, use the 'readValues()' method to populate a OwnedValueAccessor * vector. Some "views" (values that are pointers into other memory) are constructed by appending * them to the 'valueBufferBuilder' provided to the constructor, and the internal buffer in that * 'valueBufferBuilder' must be kept alive for as long as the accessors are to remain valid. @@ -198,7 +198,7 @@ public: * into the memory constructed by the '_valueBufferBuilder' object, which is a caller-owned * object that must remain valid for as long as these accessors are to remain valid. */ - void readValues(std::vector<ViewOfValueAccessor>* accessors) { + void readValues(std::vector<OwnedValueAccessor>* accessors) { auto bufferLen = _valueBufferBuilder->len(); for (size_t i = 0; i < _numValues; ++i) { auto tag = _tagList[i]; @@ -229,7 +229,7 @@ public: } invariant(i < accessors->size()); - (*accessors)[i].reset(tag, val); + (*accessors)[i].reset(false, tag, val); } } diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp index 0d58f7731a9..575ef7f1d39 100644 --- a/src/mongo/db/exec/sbe_cmd.cpp +++ b/src/mongo/db/exec/sbe_cmd.cpp @@ -39,6 +39,8 @@ #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/plan_executor_factory.h" +#include "mongo/db/query/query_knobs_gen.h" +#include "mongo/db/query/yield_policy_callbacks_impl.h" namespace mongo { /** @@ -69,15 +71,24 @@ public: uassertStatusOK(CursorRequest::parseCommandCursorOptions( cmdObj, query_request_helper::kDefaultBatchSize, &batchSize)); + NamespaceString nss{dbname}; + + auto yieldPolicy = std::make_unique<PlanYieldPolicySBE>( + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + opCtx->getServiceContext()->getFastClockSource(), + internalQueryExecYieldIterations.load(), + Milliseconds{internalQueryExecYieldPeriodMS.load()}, + nullptr, + std::make_unique<YieldPolicyCallbacksImpl>(nss)); + auto env = std::make_unique<sbe::RuntimeEnvironment>(); sbe::Parser parser(env.get()); - auto root = parser.parse(opCtx, dbname, cmdObj["sbe"].String()); + auto root = parser.parse(opCtx, dbname, cmdObj["sbe"].String(), yieldPolicy.get()); auto [resultSlot, recordIdSlot] = parser.getTopLevelSlots(); std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; BSONArrayBuilder firstBatch; - NamespaceString nss{dbname}; // Create a trivial cannonical query for the 'sbe' command execution. auto statusWithCQ = @@ -94,7 +105,6 @@ public: } root->attachToOperationContext(opCtx); - exec = uassertStatusOK(plan_executor_factory::make(opCtx, std::move(cq), nullptr, @@ -102,7 +112,7 @@ public: &CollectionPtr::null, false, /* returnOwnedBson */ nss, - nullptr)); + std::move(yieldPolicy))); for (long long objCount = 0; objCount < batchSize; objCount++) { BSONObj next; PlanExecutor::ExecState state = exec->getNext(&next, nullptr); diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index 822e28883d8..b4905fce9ec 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -118,6 +118,7 @@ PlanExecutorSBE::PlanExecutorSBE(OperationContext* opCtx, void PlanExecutorSBE::saveState() { _root->saveState(); _yieldPolicy->setYieldable(nullptr); + _lastGetNext = {}; } void PlanExecutorSBE::restoreState(const RestoreContext& context) { @@ -235,6 +236,7 @@ PlanExecutor::ExecState PlanExecutorSBE::getNext(BSONObj* out, RecordId* dlOut) if (result == sbe::PlanState::IS_EOF) { _root->close(); _state = State::kClosed; + _lastGetNext = {}; if (MONGO_unlikely(planExecutorHangBeforeShouldWaitForInserts.shouldFail( [this](const BSONObj& data) { @@ -264,6 +266,9 @@ PlanExecutor::ExecState PlanExecutorSBE::getNext(BSONObj* out, RecordId* dlOut) } invariant(result == sbe::PlanState::ADVANCED); + if (_mustReturnOwnedBson) { + _lastGetNext = *out; + } return PlanExecutor::ExecState::ADVANCED; } } diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h index 918b2c8670a..191853cc9ce 100644 --- a/src/mongo/db/query/plan_executor_sbe.h +++ b/src/mongo/db/query/plan_executor_sbe.h @@ -155,6 +155,9 @@ private: boost::optional<sbe::value::SlotId> _resumeRecordIdSlot; std::queue<std::pair<BSONObj, boost::optional<RecordId>>> _stash; + // If we are returning owned result (i.e. value is moved out of the result accessor) then its + // lifetime must extend up to the next getNext (or saveState). + BSONObj _lastGetNext; // If _killStatus has a non-OK value, then we have been killed and the value represents the // reason for the kill. |