summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/collection_scan.cpp13
-rw-r--r--src/mongo/db/exec/collection_scan.h2
-rw-r--r--src/mongo/db/exec/collection_scan_common.h3
-rw-r--r--src/mongo/db/exec/sbe/abt/abt_lower.cpp1
-rw-r--r--src/mongo/db/exec/sbe/stages/scan.cpp22
-rw-r--r--src/mongo/db/exec/sbe/stages/scan.h5
6 files changed, 45 insertions, 1 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index e9deeaa9069..0a3eceafa02 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -135,9 +135,15 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
if (_commonStats.isEOF) {
+ _priority.reset();
return PlanStage::IS_EOF;
}
+ if (_params.lowPriority && !_priority && opCtx()->getClient()->isFromUserConnection() &&
+ opCtx()->lockState()->shouldWaitForTicket()) {
+ _priority.emplace(opCtx()->lockState(), AdmissionContext::Priority::kLow);
+ }
+
boost::optional<Record> record;
const bool needToMakeCursor = !_cursor;
@@ -252,6 +258,7 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
if (_params.shouldTrackLatestOplogTimestamp && collection()->ns().isChangeCollection()) {
setLatestOplogEntryTimestampToReadTimestamp();
}
+ _priority.reset();
return PlanStage::IS_EOF;
}
@@ -461,9 +468,15 @@ void CollectionScan::doRestoreStateRequiresCollection() {
void CollectionScan::doDetachFromOperationContext() {
if (_cursor)
_cursor->detachFromOperationContext();
+
+ _priority.reset();
}
void CollectionScan::doReattachToOperationContext() {
+ if (_params.lowPriority && opCtx()->getClient()->isFromUserConnection() &&
+ opCtx()->lockState()->shouldWaitForTicket()) {
+ _priority.emplace(opCtx()->lockState(), AdmissionContext::Priority::kLow);
+ }
if (_cursor)
_cursor->reattachToOperationContext(opCtx());
}
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index 6f4509b5836..8a465b9cdc0 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -142,6 +142,8 @@ private:
// on EOF we advance this timestamp to the latest timestamp in the global oplog.
Timestamp _latestOplogEntryTimestamp;
+ boost::optional<ScopedAdmissionPriorityForLock> _priority;
+
// Stats
CollectionScanStats _specificStats;
};
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index 2031fafe8b4..64e896ff9d6 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -115,6 +115,9 @@ struct CollectionScanParams {
// Whether or not to return EOF and stop further scanning once MatchExpression evaluates to
// false. Can only be set to true if the MatchExpression is present.
bool shouldReturnEofOnFilterMismatch = false;
+
+ // Whether the collection scan should have low storage admission priority.
+ bool lowPriority = false;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/sbe/abt/abt_lower.cpp b/src/mongo/db/exec/sbe/abt/abt_lower.cpp
index 7496e25f67d..6f9a7fd5463 100644
--- a/src/mongo/db/exec/sbe/abt/abt_lower.cpp
+++ b/src/mongo/db/exec/sbe/abt/abt_lower.cpp
@@ -1090,6 +1090,7 @@ std::unique_ptr<sbe::PlanStage> SBENodeLowering::walk(const PhysicalScanNode& n,
nullptr /*yieldPolicy*/,
planNodeId,
callbacks,
+ false, /* lowPriority */
_scanOrder == ScanOrder::Random);
} else {
tasserted(6624355, "Unknown scan type.");
diff --git a/src/mongo/db/exec/sbe/stages/scan.cpp b/src/mongo/db/exec/sbe/stages/scan.cpp
index 5175529f99a..b8713381f04 100644
--- a/src/mongo/db/exec/sbe/stages/scan.cpp
+++ b/src/mongo/db/exec/sbe/stages/scan.cpp
@@ -58,6 +58,7 @@ ScanStage::ScanStage(UUID collectionUuid,
PlanYieldPolicy* yieldPolicy,
PlanNodeId nodeId,
ScanCallbacks scanCallbacks,
+ bool lowPriority,
bool useRandomCursor,
bool participateInTrialRunTracking)
: PlanStage(
@@ -75,7 +76,8 @@ ScanStage::ScanStage(UUID collectionUuid,
_seekKeySlot(seekKeySlot),
_forward(forward),
_scanCallbacks(std::move(scanCallbacks)),
- _useRandomCursor(useRandomCursor) {
+ _useRandomCursor(useRandomCursor),
+ _lowPriority(lowPriority) {
invariant(_fields.size() == _vars.size());
invariant(!_seekKeySlot || _forward);
tassert(5567202,
@@ -108,6 +110,7 @@ std::unique_ptr<PlanStage> ScanStage::clone() const {
_yieldPolicy,
_commonStats.nodeId,
_scanCallbacks,
+ _lowPriority,
_useRandomCursor,
_participateInTrialRunTracking);
}
@@ -306,9 +309,14 @@ void ScanStage::doDetachFromOperationContext() {
if (auto cursor = getActiveCursor()) {
cursor->detachFromOperationContext();
}
+ _priority.reset();
}
void ScanStage::doAttachToOperationContext(OperationContext* opCtx) {
+ if (_lowPriority && _open && opCtx->getClient()->isFromUserConnection() &&
+ opCtx->lockState()->shouldWaitForTicket()) {
+ _priority.emplace(opCtx->lockState(), AdmissionContext::Priority::kLow);
+ }
if (auto cursor = getActiveCursor()) {
cursor->reattachToOperationContext(opCtx);
}
@@ -408,6 +416,11 @@ value::OwnedValueAccessor* ScanStage::getFieldAccessor(StringData name, size_t o
PlanState ScanStage::getNext() {
auto optTimer(getOptTimer(_opCtx));
+ if (_lowPriority && !_priority && _opCtx->getClient()->isFromUserConnection() &&
+ _opCtx->lockState()->shouldWaitForTicket()) {
+ _priority.emplace(_opCtx->lockState(), AdmissionContext::Priority::kLow);
+ }
+
// We are about to call next() on a storage cursor so do not bother saving our internal state in
// case it yields as the state will be completely overwritten after the next() call.
disableSlotAccess();
@@ -449,6 +462,7 @@ PlanState ScanStage::getNext() {
_key,
*_collName);
}
+ _priority.reset();
return trackPlanState(PlanState::IS_EOF);
}
@@ -456,6 +470,7 @@ PlanState ScanStage::getNext() {
if (_scanCallbacks.indexKeyConsistencyCheckCallBack &&
!_scanCallbacks.indexKeyConsistencyCheckCallBack(
_opCtx, _snapshotIdAccessor, _indexIdAccessor, _indexKeyAccessor, _coll, *nextRecord)) {
+ _priority.reset();
return trackPlanState(PlanState::IS_EOF);
}
@@ -562,6 +577,7 @@ void ScanStage::close() {
_cursor.reset();
_randomCursor.reset();
_coll.reset();
+ _priority.reset();
_open = false;
}
@@ -652,6 +668,10 @@ std::vector<DebugPrinter::Block> ScanStage::debugPrint() const {
DebugPrinter::addKeyword(ret, "random");
}
+ if (_lowPriority) {
+ DebugPrinter::addKeyword(ret, "lowPriority");
+ }
+
ret.emplace_back(DebugPrinter::Block("[`"));
for (size_t idx = 0; idx < _fields.size(); ++idx) {
if (idx) {
diff --git a/src/mongo/db/exec/sbe/stages/scan.h b/src/mongo/db/exec/sbe/stages/scan.h
index 6642c4ce1e8..33284950171 100644
--- a/src/mongo/db/exec/sbe/stages/scan.h
+++ b/src/mongo/db/exec/sbe/stages/scan.h
@@ -110,6 +110,7 @@ public:
PlanYieldPolicy* yieldPolicy,
PlanNodeId nodeId,
ScanCallbacks scanCallbacks,
+ bool lowPriority = false,
bool useRandomCursor = false,
bool participateInTrialRunTracking = true);
@@ -232,6 +233,10 @@ private:
RecordId _key;
bool _firstGetNext{false};
+ // Whether the scan should have low storage admission priority.
+ bool _lowPriority;
+ boost::optional<ScopedAdmissionPriorityForLock> _priority;
+
ScanStats _specificStats;
// Flag set upon restoring the stage that indicates whether the cursor's position in the