diff options
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/collection_scan.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan.h | 2 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan_common.h | 3 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/abt/abt_lower.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/scan.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/scan.h | 5 |
6 files changed, 45 insertions, 1 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index e9deeaa9069..0a3eceafa02 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -135,9 +135,15 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx, PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { if (_commonStats.isEOF) { + _priority.reset(); return PlanStage::IS_EOF; } + if (_params.lowPriority && !_priority && opCtx()->getClient()->isFromUserConnection() && + opCtx()->lockState()->shouldWaitForTicket()) { + _priority.emplace(opCtx()->lockState(), AdmissionContext::Priority::kLow); + } + boost::optional<Record> record; const bool needToMakeCursor = !_cursor; @@ -252,6 +258,7 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { if (_params.shouldTrackLatestOplogTimestamp && collection()->ns().isChangeCollection()) { setLatestOplogEntryTimestampToReadTimestamp(); } + _priority.reset(); return PlanStage::IS_EOF; } @@ -461,9 +468,15 @@ void CollectionScan::doRestoreStateRequiresCollection() { void CollectionScan::doDetachFromOperationContext() { if (_cursor) _cursor->detachFromOperationContext(); + + _priority.reset(); } void CollectionScan::doReattachToOperationContext() { + if (_params.lowPriority && opCtx()->getClient()->isFromUserConnection() && + opCtx()->lockState()->shouldWaitForTicket()) { + _priority.emplace(opCtx()->lockState(), AdmissionContext::Priority::kLow); + } if (_cursor) _cursor->reattachToOperationContext(opCtx()); } diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index 6f4509b5836..8a465b9cdc0 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -142,6 +142,8 @@ private: // on EOF we advance this timestamp to the latest timestamp in the global oplog. Timestamp _latestOplogEntryTimestamp; + boost::optional<ScopedAdmissionPriorityForLock> _priority; + // Stats CollectionScanStats _specificStats; }; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index 2031fafe8b4..64e896ff9d6 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -115,6 +115,9 @@ struct CollectionScanParams { // Whether or not to return EOF and stop further scanning once MatchExpression evaluates to // false. Can only be set to true if the MatchExpression is present. bool shouldReturnEofOnFilterMismatch = false; + + // Whether the collection scan should have low storage admission priority. + bool lowPriority = false; }; } // namespace mongo diff --git a/src/mongo/db/exec/sbe/abt/abt_lower.cpp b/src/mongo/db/exec/sbe/abt/abt_lower.cpp index 7496e25f67d..6f9a7fd5463 100644 --- a/src/mongo/db/exec/sbe/abt/abt_lower.cpp +++ b/src/mongo/db/exec/sbe/abt/abt_lower.cpp @@ -1090,6 +1090,7 @@ std::unique_ptr<sbe::PlanStage> SBENodeLowering::walk(const PhysicalScanNode& n, nullptr /*yieldPolicy*/, planNodeId, callbacks, + false, /* lowPriority */ _scanOrder == ScanOrder::Random); } else { tasserted(6624355, "Unknown scan type."); diff --git a/src/mongo/db/exec/sbe/stages/scan.cpp b/src/mongo/db/exec/sbe/stages/scan.cpp index 5175529f99a..b8713381f04 100644 --- a/src/mongo/db/exec/sbe/stages/scan.cpp +++ b/src/mongo/db/exec/sbe/stages/scan.cpp @@ -58,6 +58,7 @@ ScanStage::ScanStage(UUID collectionUuid, PlanYieldPolicy* yieldPolicy, PlanNodeId nodeId, ScanCallbacks scanCallbacks, + bool lowPriority, bool useRandomCursor, bool participateInTrialRunTracking) : PlanStage( @@ -75,7 +76,8 @@ ScanStage::ScanStage(UUID collectionUuid, _seekKeySlot(seekKeySlot), _forward(forward), _scanCallbacks(std::move(scanCallbacks)), - _useRandomCursor(useRandomCursor) { + _useRandomCursor(useRandomCursor), + _lowPriority(lowPriority) { invariant(_fields.size() == _vars.size()); invariant(!_seekKeySlot || _forward); tassert(5567202, @@ -108,6 +110,7 @@ std::unique_ptr<PlanStage> ScanStage::clone() const { _yieldPolicy, _commonStats.nodeId, _scanCallbacks, + _lowPriority, _useRandomCursor, _participateInTrialRunTracking); } @@ -306,9 +309,14 @@ void ScanStage::doDetachFromOperationContext() { if (auto cursor = getActiveCursor()) { cursor->detachFromOperationContext(); } + _priority.reset(); } void ScanStage::doAttachToOperationContext(OperationContext* opCtx) { + if (_lowPriority && _open && opCtx->getClient()->isFromUserConnection() && + opCtx->lockState()->shouldWaitForTicket()) { + _priority.emplace(opCtx->lockState(), AdmissionContext::Priority::kLow); + } if (auto cursor = getActiveCursor()) { cursor->reattachToOperationContext(opCtx); } @@ -408,6 +416,11 @@ value::OwnedValueAccessor* ScanStage::getFieldAccessor(StringData name, size_t o PlanState ScanStage::getNext() { auto optTimer(getOptTimer(_opCtx)); + if (_lowPriority && !_priority && _opCtx->getClient()->isFromUserConnection() && + _opCtx->lockState()->shouldWaitForTicket()) { + _priority.emplace(_opCtx->lockState(), AdmissionContext::Priority::kLow); + } + // We are about to call next() on a storage cursor so do not bother saving our internal state in // case it yields as the state will be completely overwritten after the next() call. disableSlotAccess(); @@ -449,6 +462,7 @@ PlanState ScanStage::getNext() { _key, *_collName); } + _priority.reset(); return trackPlanState(PlanState::IS_EOF); } @@ -456,6 +470,7 @@ PlanState ScanStage::getNext() { if (_scanCallbacks.indexKeyConsistencyCheckCallBack && !_scanCallbacks.indexKeyConsistencyCheckCallBack( _opCtx, _snapshotIdAccessor, _indexIdAccessor, _indexKeyAccessor, _coll, *nextRecord)) { + _priority.reset(); return trackPlanState(PlanState::IS_EOF); } @@ -562,6 +577,7 @@ void ScanStage::close() { _cursor.reset(); _randomCursor.reset(); _coll.reset(); + _priority.reset(); _open = false; } @@ -652,6 +668,10 @@ std::vector<DebugPrinter::Block> ScanStage::debugPrint() const { DebugPrinter::addKeyword(ret, "random"); } + if (_lowPriority) { + DebugPrinter::addKeyword(ret, "lowPriority"); + } + ret.emplace_back(DebugPrinter::Block("[`")); for (size_t idx = 0; idx < _fields.size(); ++idx) { if (idx) { diff --git a/src/mongo/db/exec/sbe/stages/scan.h b/src/mongo/db/exec/sbe/stages/scan.h index 6642c4ce1e8..33284950171 100644 --- a/src/mongo/db/exec/sbe/stages/scan.h +++ b/src/mongo/db/exec/sbe/stages/scan.h @@ -110,6 +110,7 @@ public: PlanYieldPolicy* yieldPolicy, PlanNodeId nodeId, ScanCallbacks scanCallbacks, + bool lowPriority = false, bool useRandomCursor = false, bool participateInTrialRunTracking = true); @@ -232,6 +233,10 @@ private: RecordId _key; bool _firstGetNext{false}; + // Whether the scan should have low storage admission priority. + bool _lowPriority; + boost::optional<ScopedAdmissionPriorityForLock> _priority; + ScanStats _specificStats; // Flag set upon restoring the stage that indicates whether the cursor's position in the |