diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-11-08 10:37:02 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-08 11:10:15 +0000 |
commit | 25329194fe4343fd4ef1f3423377da742a47d5d6 (patch) | |
tree | a80644d0d7faf7172fb64c7dfa850dfe4728249b /src/mongo/util/concurrency/ticketholder.h | |
parent | 5e1d12241178f6c6520d57476552b362b5ddf237 (diff) | |
download | mongo-25329194fe4343fd4ef1f3423377da742a47d5d6.tar.gz |
SERVER-70927 Make PriorityTicketHolder select where to dequeue
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.h')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 71 |
1 files changed, 39 insertions, 32 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 834aa28f1fe..3b0153d1a85 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -284,7 +284,9 @@ private: class PriorityTicketHolder : public TicketHolderWithQueueingStats { protected: public: - explicit PriorityTicketHolder(int numTickets, ServiceContext* serviceContext); + explicit PriorityTicketHolder(int numTickets, + int lowPriorityBypassThreshold, + ServiceContext* serviceContext); ~PriorityTicketHolder() override; int available() const override final; @@ -295,7 +297,13 @@ public: return true; }; - std::int64_t promoted() const; + void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold); + + /** + * Number of times low priority operations are expedited for ticket admission over normal + * priority operations. + */ + std::int64_t expedited() const; /** * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the @@ -314,9 +322,9 @@ private: // // The alternative of using ResourceMutex is not appropriate as the class serves as a // concurrency primitive and is performance sensitive. - using QueueMutex = std::shared_mutex; // NOLINT - using ReleaserLockGuard = std::shared_lock<QueueMutex>; // NOLINT - using EnqueuerLockGuard = std::unique_lock<QueueMutex>; // NOLINT + using QueueMutex = std::shared_mutex; // NOLINT + using SharedLockGuard = std::shared_lock<QueueMutex>; // NOLINT + using UniqueLockGuard = std::unique_lock<QueueMutex>; // NOLINT enum class QueueType : unsigned int { kLowPriority = 0, @@ -327,39 +335,27 @@ private: QueueTypeSize = 3 }; - enum class AdmissionStatus { - // The ticket request is successful, and the operation is ready to acquire a ticket. - kReadyToAcquire, - // There are no tickets available for the request at its given priority. The priority should - // be increased to allow for ticket acquisition. - kNeedsPromotion, - // An interruption occured while attempting to acquire a ticket. - kInterrupted, - }; - class Queue { public: Queue(PriorityTicketHolder* holder, QueueType queueType) : _holder(holder), _queueType(queueType){}; - bool attemptToDequeue(const ReleaserLockGuard& releaserLock); + bool attemptToDequeue(const SharedLockGuard& sharedQueueLock); - AdmissionStatus enqueue(OperationContext* interruptible, - EnqueuerLockGuard& queueLock, - const Date_t& until, - WaitMode waitMode); + /** + * Returns true if this operation is assigned a ticket, false if the deadline is exceeded + * before ticket acquisition. Throws if the operation is interrupted. + */ + bool enqueue(OperationContext* interruptible, + UniqueLockGuard& queueLock, + const Date_t& until, + WaitMode waitMode); int queuedElems() const { return _queuedThreads; } /** - * Signals that a queued thread should be woken and exit the queue so it can be promoted to - * a higher priority queue. - */ - void signalPromoteSingleOp(const ReleaserLockGuard& releaserLock); - - /** * Returns a reference to the Queue statistics that allows callers to update the statistics. */ QueueStats& getStatsToUse() { @@ -374,11 +370,10 @@ private: } private: - void _signalThreadWoken(const EnqueuerLockGuard& enqueuerLock); + void _signalThreadWoken(const UniqueLockGuard& uniqueQueueLock); int _queuedThreads{0}; AtomicWord<int> _threadsToBeWoken{0}; - AtomicWord<bool> _signalPromotion{false}; stdx::condition_variable _cv; PriorityTicketHolder* _holder; @@ -416,13 +411,13 @@ private: * - The number of items in each queue will not change during the execution * - No other thread will proceed to wait during the execution of the method */ - void _dequeueWaitingThread(const ReleaserLockGuard& releaserLock); + void _dequeueWaitingThread(const SharedLockGuard& sharedQueueLock); /** * Returns whether there are higher priority threads pending to get a ticket in front of the * given queue type and not enough tickets for all of them. */ - bool _hasToWaitForHigherPriority(const EnqueuerLockGuard& lk, QueueType queue); + bool _hasToWaitForHigherPriority(const UniqueLockGuard& lk, QueueType queue); QueueType _queueType(const AdmissionContext* admCtx); @@ -434,13 +429,25 @@ private: QueueMutex _queueMutex; /** + * Limits the number times the low priority queue is non-empty and bypassed in favor of the + * normal priority queue for the next ticket admission. + * + * Updates must be done under the UniqueLockGuard. + */ + int _lowPriorityBypassThreshold; + + /** * Counts the number of times normal operations are dequeued over operations queued in the low * priority queue. */ - AtomicWord<std::int64_t> _lowPriorityBypassCount{0}; + AtomicWord<std::uint64_t> _lowPriorityBypassCount{0}; + + /** + * Number of times ticket admission is expedited for low priority operations. + */ + AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0}; AtomicWord<int> _ticketsAvailable; AtomicWord<int> _enqueuedElements; - AtomicWord<std::int64_t> _promotedElements{0}; ServiceContext* _serviceContext; }; |