summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/ticketholder.cpp
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-10-26 14:04:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-26 15:04:24 +0000
commit3edb2e3a7f7320692899c0a295f7d580e38783a3 (patch)
tree6ebf69397ae27d62f5ca13aa357695773c955ca2 /src/mongo/util/concurrency/ticketholder.cpp
parent83db85a8e224faf3e3bb36c610e9b35669429dfe (diff)
downloadmongo-3edb2e3a7f7320692899c0a295f7d580e38783a3.tar.gz
SERVER-69935 Introduce PriorityTicketHolder promotion policy
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.cpp')
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp138
1 files changed, 103 insertions, 35 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index c7c4f303d63..a9c44467536 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/service_context.h"
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/ticketholder.h"
+#include "mongo/util/concurrency/ticketholder_params_gen.h"
#include <iostream>
@@ -380,9 +381,9 @@ void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept {
PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext)
: TicketHolderWithQueueingStats(numTickets, serviceContext),
- _queues{Queue(this, QueueType::LowPriorityQueue),
- Queue(this, QueueType::NormalPriorityQueue),
- Queue(this, QueueType::ImmediatePriorityNoOpQueue)},
+ _queues{Queue(this, QueueType::kLowPriority),
+ Queue(this, QueueType::kNormalPriority),
+ Queue(this, QueueType::kImmediatePriority)},
_serviceContext(serviceContext) {
_ticketsAvailable.store(numTickets);
@@ -399,6 +400,14 @@ int PriorityTicketHolder::queued() const {
return _enqueuedElements.loadRelaxed();
}
+std::int64_t PriorityTicketHolder::promoted() const {
+ return _promotedElements.loadRelaxed();
+}
+
+std::int64_t PriorityTicketHolder::bypassed() const {
+ return _lowPriorityBypassCount.loadRelaxed();
+}
+
void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
// Tickets acquired with priority kImmediate are not generated from the pool of available
// tickets, and thus should never be returned to the pool of available tickets.
@@ -453,16 +462,28 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC
WaitMode waitMode) {
invariant(admCtx);
- auto& queue = _getQueueToUse(admCtx);
+ auto queueType = _queueType(admCtx);
+ auto& queue = _getQueue(queueType);
- bool assigned;
+ AdmissionStatus admissionStatus;
{
EnqueuerLockGuard enqueuerLock(_queueMutex);
+
_enqueuedElements.addAndFetch(1);
ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); });
- assigned = queue.enqueue(opCtx, enqueuerLock, until, waitMode);
+
+ admissionStatus = queue.enqueue(opCtx, enqueuerLock, until, waitMode);
+
+ if (admissionStatus == AdmissionStatus::kNeedsPromotion) {
+ invariant(queueType == QueueType::kLowPriority);
+
+ _promotedElements.fetchAndAdd(1);
+ auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority);
+ admissionStatus = normalPriorityQueue.enqueue(opCtx, enqueuerLock, until, waitMode);
+ }
}
- if (assigned) {
+
+ if (admissionStatus == AdmissionStatus::kReadyToAcquire) {
return Ticket{this, admCtx};
} else {
return boost::none;
@@ -472,9 +493,8 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC
bool PriorityTicketHolder::_hasToWaitForHigherPriority(const EnqueuerLockGuard& lk,
QueueType queue) {
switch (queue) {
- case QueueType::LowPriorityQueue: {
- const auto& normalQueue =
- _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)];
+ case QueueType::kLowPriority: {
+ const auto& normalQueue = _getQueue(QueueType::kNormalPriority);
auto pending = normalQueue.getThreadsPendingToWake();
return pending != 0 && pending >= _ticketsAvailable.load();
}
@@ -523,10 +543,11 @@ void PriorityTicketHolder::Queue::_signalThreadWoken(const EnqueuerLockGuard& en
}
}
-bool PriorityTicketHolder::Queue::enqueue(OperationContext* opCtx,
- EnqueuerLockGuard& enqueuerLock,
- const Date_t& until,
- WaitMode waitMode) {
+PriorityTicketHolder::AdmissionStatus PriorityTicketHolder::Queue::enqueue(
+ OperationContext* opCtx,
+ EnqueuerLockGuard& enqueuerLock,
+ const Date_t& until,
+ WaitMode waitMode) {
_queuedThreads++;
// Before exiting we remove ourselves from the count of queued threads, we are still holding the
// lock here so this is safe.
@@ -558,36 +579,75 @@ bool PriorityTicketHolder::Queue::enqueue(OperationContext* opCtx,
opCtx->checkForInterrupt();
}
if (waitResult == stdx::cv_status::timeout)
- return false;
+ return AdmissionStatus::kInterrupted;
+
+ bool needsPromotion = _signalPromotion.swap(false);
+ if (needsPromotion) {
+ // The thread is woken for promotion, which is different than a thread woken to
+ // acquire a ticket / a product of an interruption.
+ return PriorityTicketHolder::AdmissionStatus::kNeedsPromotion;
+ }
}
} while (!_holder->_tryAcquireTicket());
- return true;
+ return AdmissionStatus::kReadyToAcquire;
+}
+
+void PriorityTicketHolder::Queue::signalPromoteSingleOp(const ReleaserLockGuard& releaserLock) {
+ // Only signal a promotion if there exists an operation to promote.
+ //
+ // Additionally, only modify _signalPromotion if it was false to begin. This prevents the
+ // following race (as multiple releasers may call this code concurrently):
+ // . ReleaserA attempts to dequeue, succeeds, _signalPromotion is set to true
+ // . ReleaserB attempts to dequeue, fails, _signalPromotion is set to false before the
+ // woken thread has a chance to check whether it should be promoted.
+ //
+ //
+ bool permittedOriginalValue = false;
+ _signalPromotion.compareAndSwap(&permittedOriginalValue, attemptToDequeue(releaserLock));
}
void PriorityTicketHolder::_dequeueWaitingThread(const ReleaserLockGuard& releaserLock) {
- // There should never be anything to dequeue from 'QueueType::ImmediatePriorityNoOpQueue' since
+ // There should never be anything to dequeue from 'QueueType::kImmediatePriority' since
// 'kImmediate' operations should always bypass the need to queue.
- int currentIndexQueue = static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue) - 1;
- while (!_queues[currentIndexQueue].attemptToDequeue(releaserLock)) {
- if (currentIndexQueue == 0)
- break;
- else
- currentIndexQueue--;
+ auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority);
+ if (!normalPriorityQueue.attemptToDequeue(releaserLock)) {
+ _getQueue(QueueType::kLowPriority).attemptToDequeue(releaserLock);
+ return;
+ }
+
+ auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority);
+ if (lowPriorityQueue.queuedElems() == 0) {
+ // Dequeueing from the normal queue didn't bypass any operations waiting in the low priority
+ // queue, return early.
+ return;
+ }
+
+ // To prevent starvation, record the number of times operations in the low priority queue are
+ // bypassed in favor of dequeueing from the normal queue. If operations are bypassed enough, it
+ // may be time to promote a low priority operation to give it a chance to eventually run.
+ //
+ // A value of 0 implies low priority operations are never to be promoted.
+ if (auto lowPriorityOperationPromotionRate = gLowPriorityOperationPromotionRate.load();
+ lowPriorityOperationPromotionRate > 0) {
+ if (_lowPriorityBypassCount.addAndFetch(1) % lowPriorityOperationPromotionRate == 0) {
+ lowPriorityQueue.signalPromoteSingleOp(releaserLock);
+ }
}
}
void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
{
BSONObjBuilder bbb(b.subobjStart("lowPriority"));
- auto& lowPriorityTicketStats =
- _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)].getStats();
+ const auto& lowPriorityTicketStats = _getQueue(QueueType::kLowPriority).getStatsToUse();
appendCommonQueueImplStats(bbb, lowPriorityTicketStats);
+ bbb.append("promoted", promoted());
+ bbb.append("bypassCount", bypassed());
bbb.done();
}
{
BSONObjBuilder bbb(b.subobjStart("normalPriority"));
- auto& normalPriorityTicketStats =
- _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)].getStats();
+ const auto& normalPriorityTicketStats =
+ _getQueue(QueueType::kNormalPriority).getStatsToUse();
appendCommonQueueImplStats(bbb, normalPriorityTicketStats);
bbb.done();
}
@@ -595,8 +655,7 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
BSONObjBuilder bbb(b.subobjStart("immediatePriority"));
// Since 'kImmediate' priority operations will never queue, omit queueing statistics that
// will always be 0.
- auto& immediateTicketStats =
- _queues[static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue)].getStats();
+ const auto& immediateTicketStats = _getQueue(QueueType::kImmediatePriority).getStatsToUse();
auto finished = immediateTicketStats.totalFinishedProcessing.loadRelaxed();
auto started = immediateTicketStats.totalStartedProcessing.loadRelaxed();
@@ -610,22 +669,31 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
}
}
-PriorityTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse(const AdmissionContext* admCtx) {
+PriorityTicketHolder::QueueType PriorityTicketHolder::_queueType(const AdmissionContext* admCtx) {
auto priority = admCtx->getPriority();
switch (priority) {
case AdmissionContext::Priority::kLow:
- return _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)];
+ return QueueType::kLowPriority;
case AdmissionContext::Priority::kNormal:
- return _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)];
+ return QueueType::kNormalPriority;
case AdmissionContext::Priority::kImmediate:
- return _queues[static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue)];
+ return QueueType::kImmediatePriority;
+ default:
+ MONGO_UNREACHABLE;
}
+}
+
+PriorityTicketHolder::Queue& PriorityTicketHolder::_getQueue(QueueType queueType) {
+ return _queues[static_cast<unsigned int>(queueType)];
+}
- MONGO_UNREACHABLE;
+const PriorityTicketHolder::Queue& PriorityTicketHolder::_getQueue(QueueType queueType) const {
+ return _queues[static_cast<unsigned int>(queueType)];
}
TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse(
const AdmissionContext* admCtx) noexcept {
- return _getQueueToUse(admCtx).getStatsToUse();
+ auto queueType = _queueType(admCtx);
+ return _getQueue(queueType).getStatsToUse();
}
} // namespace mongo