diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-10-26 14:04:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-26 15:04:24 +0000 |
commit | 3edb2e3a7f7320692899c0a295f7d580e38783a3 (patch) | |
tree | 6ebf69397ae27d62f5ca13aa357695773c955ca2 /src/mongo/util/concurrency/ticketholder.cpp | |
parent | 83db85a8e224faf3e3bb36c610e9b35669429dfe (diff) | |
download | mongo-3edb2e3a7f7320692899c0a295f7d580e38783a3.tar.gz |
SERVER-69935 Introduce PriorityTicketHolder promotion policy
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.cpp')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 138 |
1 files changed, 103 insertions, 35 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index c7c4f303d63..a9c44467536 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -33,6 +33,7 @@ #include "mongo/db/service_context.h" #include "mongo/util/concurrency/admission_context.h" #include "mongo/util/concurrency/ticketholder.h" +#include "mongo/util/concurrency/ticketholder_params_gen.h" #include <iostream> @@ -380,9 +381,9 @@ void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext) : TicketHolderWithQueueingStats(numTickets, serviceContext), - _queues{Queue(this, QueueType::LowPriorityQueue), - Queue(this, QueueType::NormalPriorityQueue), - Queue(this, QueueType::ImmediatePriorityNoOpQueue)}, + _queues{Queue(this, QueueType::kLowPriority), + Queue(this, QueueType::kNormalPriority), + Queue(this, QueueType::kImmediatePriority)}, _serviceContext(serviceContext) { _ticketsAvailable.store(numTickets); @@ -399,6 +400,14 @@ int PriorityTicketHolder::queued() const { return _enqueuedElements.loadRelaxed(); } +std::int64_t PriorityTicketHolder::promoted() const { + return _promotedElements.loadRelaxed(); +} + +std::int64_t PriorityTicketHolder::bypassed() const { + return _lowPriorityBypassCount.loadRelaxed(); +} + void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { // Tickets acquired with priority kImmediate are not generated from the pool of available // tickets, and thus should never be returned to the pool of available tickets. @@ -453,16 +462,28 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC WaitMode waitMode) { invariant(admCtx); - auto& queue = _getQueueToUse(admCtx); + auto queueType = _queueType(admCtx); + auto& queue = _getQueue(queueType); - bool assigned; + AdmissionStatus admissionStatus; { EnqueuerLockGuard enqueuerLock(_queueMutex); + _enqueuedElements.addAndFetch(1); ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); }); - assigned = queue.enqueue(opCtx, enqueuerLock, until, waitMode); + + admissionStatus = queue.enqueue(opCtx, enqueuerLock, until, waitMode); + + if (admissionStatus == AdmissionStatus::kNeedsPromotion) { + invariant(queueType == QueueType::kLowPriority); + + _promotedElements.fetchAndAdd(1); + auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority); + admissionStatus = normalPriorityQueue.enqueue(opCtx, enqueuerLock, until, waitMode); + } } - if (assigned) { + + if (admissionStatus == AdmissionStatus::kReadyToAcquire) { return Ticket{this, admCtx}; } else { return boost::none; @@ -472,9 +493,8 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC bool PriorityTicketHolder::_hasToWaitForHigherPriority(const EnqueuerLockGuard& lk, QueueType queue) { switch (queue) { - case QueueType::LowPriorityQueue: { - const auto& normalQueue = - _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)]; + case QueueType::kLowPriority: { + const auto& normalQueue = _getQueue(QueueType::kNormalPriority); auto pending = normalQueue.getThreadsPendingToWake(); return pending != 0 && pending >= _ticketsAvailable.load(); } @@ -523,10 +543,11 @@ void PriorityTicketHolder::Queue::_signalThreadWoken(const EnqueuerLockGuard& en } } -bool PriorityTicketHolder::Queue::enqueue(OperationContext* opCtx, - EnqueuerLockGuard& enqueuerLock, - const Date_t& until, - WaitMode waitMode) { +PriorityTicketHolder::AdmissionStatus PriorityTicketHolder::Queue::enqueue( + OperationContext* opCtx, + EnqueuerLockGuard& enqueuerLock, + const Date_t& until, + WaitMode waitMode) { _queuedThreads++; // Before exiting we remove ourselves from the count of queued threads, we are still holding the // lock here so this is safe. @@ -558,36 +579,75 @@ bool PriorityTicketHolder::Queue::enqueue(OperationContext* opCtx, opCtx->checkForInterrupt(); } if (waitResult == stdx::cv_status::timeout) - return false; + return AdmissionStatus::kInterrupted; + + bool needsPromotion = _signalPromotion.swap(false); + if (needsPromotion) { + // The thread is woken for promotion, which is different than a thread woken to + // acquire a ticket / a product of an interruption. + return PriorityTicketHolder::AdmissionStatus::kNeedsPromotion; + } } } while (!_holder->_tryAcquireTicket()); - return true; + return AdmissionStatus::kReadyToAcquire; +} + +void PriorityTicketHolder::Queue::signalPromoteSingleOp(const ReleaserLockGuard& releaserLock) { + // Only signal a promotion if there exists an operation to promote. + // + // Additionally, only modify _signalPromotion if it was false to begin. This prevents the + // following race (as multiple releasers may call this code concurrently): + // . ReleaserA attempts to dequeue, succeeds, _signalPromotion is set to true + // . ReleaserB attempts to dequeue, fails, _signalPromotion is set to false before the + // woken thread has a chance to check whether it should be promoted. + // + // + bool permittedOriginalValue = false; + _signalPromotion.compareAndSwap(&permittedOriginalValue, attemptToDequeue(releaserLock)); } void PriorityTicketHolder::_dequeueWaitingThread(const ReleaserLockGuard& releaserLock) { - // There should never be anything to dequeue from 'QueueType::ImmediatePriorityNoOpQueue' since + // There should never be anything to dequeue from 'QueueType::kImmediatePriority' since // 'kImmediate' operations should always bypass the need to queue. - int currentIndexQueue = static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue) - 1; - while (!_queues[currentIndexQueue].attemptToDequeue(releaserLock)) { - if (currentIndexQueue == 0) - break; - else - currentIndexQueue--; + auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority); + if (!normalPriorityQueue.attemptToDequeue(releaserLock)) { + _getQueue(QueueType::kLowPriority).attemptToDequeue(releaserLock); + return; + } + + auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority); + if (lowPriorityQueue.queuedElems() == 0) { + // Dequeueing from the normal queue didn't bypass any operations waiting in the low priority + // queue, return early. + return; + } + + // To prevent starvation, record the number of times operations in the low priority queue are + // bypassed in favor of dequeueing from the normal queue. If operations are bypassed enough, it + // may be time to promote a low priority operation to give it a chance to eventually run. + // + // A value of 0 implies low priority operations are never to be promoted. + if (auto lowPriorityOperationPromotionRate = gLowPriorityOperationPromotionRate.load(); + lowPriorityOperationPromotionRate > 0) { + if (_lowPriorityBypassCount.addAndFetch(1) % lowPriorityOperationPromotionRate == 0) { + lowPriorityQueue.signalPromoteSingleOp(releaserLock); + } } } void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { { BSONObjBuilder bbb(b.subobjStart("lowPriority")); - auto& lowPriorityTicketStats = - _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)].getStats(); + const auto& lowPriorityTicketStats = _getQueue(QueueType::kLowPriority).getStatsToUse(); appendCommonQueueImplStats(bbb, lowPriorityTicketStats); + bbb.append("promoted", promoted()); + bbb.append("bypassCount", bypassed()); bbb.done(); } { BSONObjBuilder bbb(b.subobjStart("normalPriority")); - auto& normalPriorityTicketStats = - _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)].getStats(); + const auto& normalPriorityTicketStats = + _getQueue(QueueType::kNormalPriority).getStatsToUse(); appendCommonQueueImplStats(bbb, normalPriorityTicketStats); bbb.done(); } @@ -595,8 +655,7 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { BSONObjBuilder bbb(b.subobjStart("immediatePriority")); // Since 'kImmediate' priority operations will never queue, omit queueing statistics that // will always be 0. - auto& immediateTicketStats = - _queues[static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue)].getStats(); + const auto& immediateTicketStats = _getQueue(QueueType::kImmediatePriority).getStatsToUse(); auto finished = immediateTicketStats.totalFinishedProcessing.loadRelaxed(); auto started = immediateTicketStats.totalStartedProcessing.loadRelaxed(); @@ -610,22 +669,31 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { } } -PriorityTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse(const AdmissionContext* admCtx) { +PriorityTicketHolder::QueueType PriorityTicketHolder::_queueType(const AdmissionContext* admCtx) { auto priority = admCtx->getPriority(); switch (priority) { case AdmissionContext::Priority::kLow: - return _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)]; + return QueueType::kLowPriority; case AdmissionContext::Priority::kNormal: - return _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)]; + return QueueType::kNormalPriority; case AdmissionContext::Priority::kImmediate: - return _queues[static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue)]; + return QueueType::kImmediatePriority; + default: + MONGO_UNREACHABLE; } +} + +PriorityTicketHolder::Queue& PriorityTicketHolder::_getQueue(QueueType queueType) { + return _queues[static_cast<unsigned int>(queueType)]; +} - MONGO_UNREACHABLE; +const PriorityTicketHolder::Queue& PriorityTicketHolder::_getQueue(QueueType queueType) const { + return _queues[static_cast<unsigned int>(queueType)]; } TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse( const AdmissionContext* admCtx) noexcept { - return _getQueueToUse(admCtx).getStatsToUse(); + auto queueType = _queueType(admCtx); + return _getQueue(queueType).getStatsToUse(); } } // namespace mongo |