diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/exec/sbe/stages/loop_join.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/loop_join.h | 12 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/sort.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/stages.h | 14 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/traverse.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/traverse.h | 10 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/union.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/union.h | 2 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor_sbe.cpp | 4 | ||||
-rw-r--r-- | src/mongo/util/shared_buffer.h | 25 |
10 files changed, 92 insertions, 34 deletions
diff --git a/src/mongo/db/exec/sbe/stages/loop_join.cpp b/src/mongo/db/exec/sbe/stages/loop_join.cpp index 290d8523572..52e4ab73c9e 100644 --- a/src/mongo/db/exec/sbe/stages/loop_join.cpp +++ b/src/mongo/db/exec/sbe/stages/loop_join.cpp @@ -108,7 +108,7 @@ PlanState LoopJoinStage::getNext() { auto optTimer(getOptTimer(_opCtx)); if (_outerGetNext) { - auto state = _children[0]->getNext(); + auto state = getNextOuterSide(); if (state != PlanState::ADVANCED) { return trackPlanState(state); } @@ -137,7 +137,7 @@ PlanState LoopJoinStage::getNext() { } invariant(state == PlanState::IS_EOF); - state = _children[0]->getNext(); + state = getNextOuterSide(); if (state != PlanState::ADVANCED) { return trackPlanState(state); } @@ -161,6 +161,15 @@ void LoopJoinStage::close() { _children[0]->close(); } +void LoopJoinStage::doSaveState() { + if (_isReadingLeftSide || _outerGetNext) { + // If we yield while reading the left side, there is no need to makeOwned() data held in + // the right side, since we will have to re-open it anyway. + const bool recursive = true; + _children[1]->disableSlotAccess(recursive); + } +} + std::unique_ptr<PlanStageStats> LoopJoinStage::getStats(bool includeDebugInfo) const { auto ret = std::make_unique<PlanStageStats>(_commonStats); diff --git a/src/mongo/db/exec/sbe/stages/loop_join.h b/src/mongo/db/exec/sbe/stages/loop_join.h index 7c72cf21669..3aad41a4cde 100644 --- a/src/mongo/db/exec/sbe/stages/loop_join.h +++ b/src/mongo/db/exec/sbe/stages/loop_join.h @@ -50,12 +50,20 @@ public: void open(bool reOpen) final; PlanState getNext() final; void close() final; + void doSaveState() final; std::unique_ptr<PlanStageStats> getStats(bool includeDebugInfo) const final; const SpecificStats* getSpecificStats() const final; std::vector<DebugPrinter::Block> debugPrint() const final; private: + PlanState getNextOuterSide() { + _isReadingLeftSide = true; + auto ret = _children[0]->getNext(); + _isReadingLeftSide = false; + return ret; + } + // Set of variables coming from the outer side. const value::SlotVector _outerProjects; // Set of correlated variables from the outer side that are visible on the inner side. @@ -73,6 +81,10 @@ private: bool _outerGetNext{false}; LoopJoinStats _specificStats; + // Tracks whether or not we're reading from the left child or the right child. + // This is necessary for yielding. + bool _isReadingLeftSide = false; + void openInner(); }; } // namespace mongo::sbe diff --git a/src/mongo/db/exec/sbe/stages/sort.cpp b/src/mongo/db/exec/sbe/stages/sort.cpp index b1ba46b8014..2d799f423f9 100644 --- a/src/mongo/db/exec/sbe/stages/sort.cpp +++ b/src/mongo/db/exec/sbe/stages/sort.cpp @@ -171,14 +171,16 @@ void SortStage::open(bool reOpen) { size_t idx = 0; for (auto accessor : _inKeyAccessors) { - auto [tag, val] = accessor->copyOrMoveValue(); - keys.reset(idx++, true, tag, val); + auto [tag, val] = accessor->getViewOfValue(); + auto [cTag, cVal] = copyValue(tag, val); + keys.reset(idx++, true, cTag, cVal); } idx = 0; for (auto accessor : _inValueAccessors) { - auto [tag, val] = accessor->copyOrMoveValue(); - vals.reset(idx++, true, tag, val); + auto [tag, val] = accessor->getViewOfValue(); + auto [cTag, cVal] = copyValue(tag, val); + vals.reset(idx++, true, cTag, cVal); } _sorter->emplace(std::move(keys), std::move(vals)); diff --git a/src/mongo/db/exec/sbe/stages/stages.h b/src/mongo/db/exec/sbe/stages/stages.h index f545fb21361..7deda784ecf 100644 --- a/src/mongo/db/exec/sbe/stages/stages.h +++ b/src/mongo/db/exec/sbe/stages/stages.h @@ -250,6 +250,16 @@ public: } } + void disableSlotAccess(bool recursive = false) { + auto stage = static_cast<T*>(this); + stage->_slotsAccessible = false; + if (recursive) { + for (auto& child : stage->_children) { + child->disableSlotAccess(true); + } + } + } + protected: PlanState trackPlanState(PlanState state) { if (state == PlanState::IS_EOF) { @@ -273,10 +283,6 @@ protected: return _slotsAccessible; } - void disableSlotAccess() { - _slotsAccessible = false; - } - /** * Returns an optional timer which is used to collect time spent executing the current stage. * May return boost::none if it is not necessary to collect timing info. diff --git a/src/mongo/db/exec/sbe/stages/traverse.cpp b/src/mongo/db/exec/sbe/stages/traverse.cpp index a3ad6074ce3..ae00f674187 100644 --- a/src/mongo/db/exec/sbe/stages/traverse.cpp +++ b/src/mongo/db/exec/sbe/stages/traverse.cpp @@ -144,11 +144,10 @@ void TraverseStage::openInner(value::TypeTags tag, value::Value val) { PlanState TraverseStage::getNext() { auto optTimer(getOptTimer(_opCtx)); - auto state = _children[0]->getNext(); + auto state = getNextOuterSide(); if (state != PlanState::ADVANCED) { return trackPlanState(state); } - [[maybe_unused]] auto earlyExit = traverse(_inFieldAccessor, &_outFieldOutputAccessor, 0); return trackPlanState(PlanState::ADVANCED); @@ -269,6 +268,17 @@ void TraverseStage::close() { } void TraverseStage::doSaveState() { + if (_isReadingLeftSide) { + // If we yield while reading the left side, there is no need to makeOwned() data held in + // the right side, since we will have to re-open it anyway. + const bool recursive = true; + _children[1]->disableSlotAccess(recursive); + + // As part of reading the left side we're about to reset the out field accessor anyway. + // No point in keeping its data around. + _outFieldOutputAccessor.reset(); + } + if (!slotsAccessible()) { return; } diff --git a/src/mongo/db/exec/sbe/stages/traverse.h b/src/mongo/db/exec/sbe/stages/traverse.h index ee82e75d6b4..3cebddbb661 100644 --- a/src/mongo/db/exec/sbe/stages/traverse.h +++ b/src/mongo/db/exec/sbe/stages/traverse.h @@ -79,6 +79,12 @@ private: bool traverse(value::SlotAccessor* inFieldAccessor, value::OwnedValueAccessor* outFieldOutputAccessor, size_t level); + PlanState getNextOuterSide() { + _isReadingLeftSide = true; + auto ret = _children[0]->getNext(); + _isReadingLeftSide = false; + return ret; + } // The input slot holding value being traversed. const value::SlotId _inField; @@ -116,5 +122,9 @@ private: bool _compiled{false}; bool _reOpenInner{false}; TraverseStats _specificStats; + + // Tracks whether or not we're reading from the left child or the right child. + // This is necessary for yielding. + bool _isReadingLeftSide = false; }; } // namespace mongo::sbe diff --git a/src/mongo/db/exec/sbe/stages/union.cpp b/src/mongo/db/exec/sbe/stages/union.cpp index 1752a3a90a9..95a9045291e 100644 --- a/src/mongo/db/exec/sbe/stages/union.cpp +++ b/src/mongo/db/exec/sbe/stages/union.cpp @@ -102,8 +102,10 @@ void UnionStage::open(bool reOpen) { _commonStats.opens++; if (reOpen) { - std::queue<UnionBranch> emptyQueue; - swap(_remainingBranchesToDrain, emptyQueue); + // If we are re-opening, it is important to close() any active branches. If kept open, one + // of these branch's slots may soon hold pointers to stale (potentially freed) data. A + // yield would then cause the branch to attempt to copy the stale(unowned) data. + clearBranches(); } for (auto& child : _children) { @@ -151,11 +153,7 @@ void UnionStage::close() { auto optTimer(getOptTimer(_opCtx)); trackClose(); - _currentStage = nullptr; - while (!_remainingBranchesToDrain.empty()) { - _remainingBranchesToDrain.front().close(); - _remainingBranchesToDrain.pop(); - } + clearBranches(); } std::unique_ptr<PlanStageStats> UnionStage::getStats(bool includeDebugInfo) const { @@ -218,4 +216,14 @@ std::vector<DebugPrinter::Block> UnionStage::debugPrint() const { return ret; } + +void UnionStage::clearBranches() { + while (!_remainingBranchesToDrain.empty()) { + auto& branch = _remainingBranchesToDrain.front(); + if (branch.isOpen) { + branch.close(); + } + _remainingBranchesToDrain.pop(); + } +} } // namespace mongo::sbe diff --git a/src/mongo/db/exec/sbe/stages/union.h b/src/mongo/db/exec/sbe/stages/union.h index 26bd3491330..f648b66e81d 100644 --- a/src/mongo/db/exec/sbe/stages/union.h +++ b/src/mongo/db/exec/sbe/stages/union.h @@ -74,6 +74,8 @@ private: } }; + void clearBranches(); + const std::vector<value::SlotVector> _inputVals; const value::SlotVector _outputVals; std::vector<value::SwitchAccessor> _outValueAccessors; diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index b4905fce9ec..a62dfc5b8aa 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -89,6 +89,10 @@ PlanExecutorSBE::PlanExecutorSBE(OperationContext* opCtx, if (!winner.results.empty()) { _stash = std::move(winner.results); + // The PlanExecutor keeps an extra reference to the last object pulled out of the PlanStage + // tree. This is because we want to ensure that the caller of PlanExecutor::getNext() does + // not free the object and leave a dangling pointer in the PlanStage tree. + _lastGetNext = _stash.back().first; } // Callers are allowed to disable yielding for this plan by passing a null yield policy. diff --git a/src/mongo/util/shared_buffer.h b/src/mongo/util/shared_buffer.h index 4b34d837198..a40b924d4f3 100644 --- a/src/mongo/util/shared_buffer.h +++ b/src/mongo/util/shared_buffer.h @@ -36,6 +36,7 @@ #include "mongo/platform/atomic_word.h" +#include "mongo/base/data_view.h" #include "mongo/util/allocator.h" #include "mongo/util/assert_util.h" @@ -268,24 +269,25 @@ public: other._data = nullptr; } ~UniqueBuffer() { - freeBuffer(); + free(_data); } UniqueBuffer& operator=(const UniqueBuffer&) = delete; UniqueBuffer& operator=(UniqueBuffer&& other) { - freeBuffer(); - _data = other._data; - other._data = nullptr; + UniqueBuffer temp(std::move(other)); + swap(*this, temp); return *this; } - void swap(UniqueBuffer& other) { - std::swap(_data, other._data); + friend void swap(UniqueBuffer& lhs, UniqueBuffer& rhs) { + using std::swap; + swap(lhs._data, rhs._data); } void realloc(uint32_t size) { size_t realSize = size + SharedBuffer::kHolderSize; _data = reinterpret_cast<char*>(mongoRealloc(_data, realSize)); + DataView(_data).write<uint32_t>(size); } char* get() const { @@ -297,7 +299,7 @@ public: } size_t capacity() const { - return _data ? *reinterpret_cast<const uint32_t*>(_data) : 0; + return _data ? ConstDataView(_data).read<uint32_t>() : 0; } /** @@ -317,14 +319,7 @@ private: UniqueBuffer(void* buffer) : _data(static_cast<char*>(buffer)) {} UniqueBuffer(void* buffer, uint32_t sz) : _data(static_cast<char*>(buffer)) { - *reinterpret_cast<uint32_t*>(_data) = sz; - } - - void freeBuffer() { - if (_data) { - free(_data); - _data = nullptr; - } + DataView(_data).write<uint32_t>(sz); } char* _data = nullptr; |