diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-11-11 14:18:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-11 15:12:03 +0000 |
commit | 977cf573cb8e924390e6f9108a1f9d28caf02199 (patch) | |
tree | dd46f4799214082e46e20285f8f5447ee4d6608d /src/mongo/util/concurrency/ticketholder.h | |
parent | 4802a8855efac9a9d31f0a3c47b3594df5833724 (diff) | |
download | mongo-977cf573cb8e924390e6f9108a1f9d28caf02199.tar.gz |
SERVER-70792 Separate Queue from PriorityTicketHolder
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.h')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 128 |
1 files changed, 48 insertions, 80 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 3b0153d1a85..a87dacca15f 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -41,6 +41,7 @@ #include "mongo/stdx/future.h" #include "mongo/util/concurrency/admission_context.h" #include "mongo/util/concurrency/mutex.h" +#include "mongo/util/concurrency/ticket_queues.h" #include "mongo/util/hierarchical_acquisition.h" #include "mongo/util/time_support.h" @@ -277,55 +278,48 @@ private: }; /** - * A ticketholder implementation that centralises all ticket acquisition/releases. - * Waiters will get placed in a specific internal queue according to some logic. - * Releasers will wake up a waiter from a group chosen according to some logic. + * A ticketholder implementation that centralises all ticket acquisition/releases. Waiters will get + * placed in a specific internal queue according to some logic. Releasers will wake up a waiter + * from a group chosen according to some logic. */ class PriorityTicketHolder : public TicketHolderWithQueueingStats { -protected: public: explicit PriorityTicketHolder(int numTickets, int lowPriorityBypassThreshold, ServiceContext* serviceContext); - ~PriorityTicketHolder() override; + ~PriorityTicketHolder() override{}; - int available() const override final; + int available() const override final { + return _ticketsAvailable.load(); + }; - int queued() const override final; + int queued() const override final { + return _enqueuedElements.loadRelaxed(); + } bool recordImmediateTicketStatistics() noexcept override final { return true; }; - void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold); - /** * Number of times low priority operations are expedited for ticket admission over normal * priority operations. */ - std::int64_t expedited() const; + std::int64_t expedited() const { + return _expeditedLowPriorityAdmissions.loadRelaxed(); + } /** * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the * normal priority queue when a ticket becomes available. */ - std::int64_t bypassed() const; + std::int64_t bypassed() const { + return _lowPriorityBypassCount.loadRelaxed(); + }; -private: - // Using a shared_mutex is fine here because usual considerations for avoiding them do not apply - // in this case: - // * Operations are short and do not block while holding the lock (i.e. they only do CPU-bound - // work) - // * Writer starvation is not possible as there are a finite number of operations to be - // performed in the reader case. Once all tickets get released no other thread can take the - // shared lock. - // - // The alternative of using ResourceMutex is not appropriate as the class serves as a - // concurrency primitive and is performance sensitive. - using QueueMutex = std::shared_mutex; // NOLINT - using SharedLockGuard = std::shared_lock<QueueMutex>; // NOLINT - using UniqueLockGuard = std::unique_lock<QueueMutex>; // NOLINT + void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold); +private: enum class QueueType : unsigned int { kLowPriority = 0, kNormalPriority = 1, @@ -335,53 +329,6 @@ private: QueueTypeSize = 3 }; - class Queue { - public: - Queue(PriorityTicketHolder* holder, QueueType queueType) - : _holder(holder), _queueType(queueType){}; - - bool attemptToDequeue(const SharedLockGuard& sharedQueueLock); - - /** - * Returns true if this operation is assigned a ticket, false if the deadline is exceeded - * before ticket acquisition. Throws if the operation is interrupted. - */ - bool enqueue(OperationContext* interruptible, - UniqueLockGuard& queueLock, - const Date_t& until, - WaitMode waitMode); - - int queuedElems() const { - return _queuedThreads; - } - - /** - * Returns a reference to the Queue statistics that allows callers to update the statistics. - */ - QueueStats& getStatsToUse() { - return _stats; - } - const QueueStats& getStatsToUse() const { - return _stats; - } - - int getThreadsPendingToWake() const { - return _threadsToBeWoken.load(); - } - - private: - void _signalThreadWoken(const UniqueLockGuard& uniqueQueueLock); - - int _queuedThreads{0}; - AtomicWord<int> _threadsToBeWoken{0}; - - stdx::condition_variable _cv; - PriorityTicketHolder* _holder; - QueueStats _stats; - const QueueType _queueType; - }; - - boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final; boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, @@ -411,28 +358,49 @@ private: * - The number of items in each queue will not change during the execution * - No other thread will proceed to wait during the execution of the method */ - void _dequeueWaitingThread(const SharedLockGuard& sharedQueueLock); + void _dequeueWaitingThread(const ticket_queues::SharedLockGuard& sharedQueueLock); /** * Returns whether there are higher priority threads pending to get a ticket in front of the * given queue type and not enough tickets for all of them. */ - bool _hasToWaitForHigherPriority(const UniqueLockGuard& lk, QueueType queue); + bool _hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, QueueType queueType); + + unsigned int _enumToInt(QueueType queueType) { + return static_cast<unsigned int>(queueType); + } + unsigned int _enumToInt(QueueType queueType) const { + return static_cast<unsigned int>(queueType); + } - QueueType _queueType(const AdmissionContext* admCtx); + ticket_queues::Queue& _getQueue(QueueType queueType) { + return _queues[_enumToInt(queueType)]; + } - Queue& _getQueue(QueueType queueType); - const Queue& _getQueue(QueueType queueType) const; - std::array<Queue, static_cast<unsigned int>(QueueType::QueueTypeSize)> _queues; + QueueType _getQueueType(const AdmissionContext* admCtx) { + auto priority = admCtx->getPriority(); + switch (priority) { + case AdmissionContext::Priority::kLow: + return QueueType::kLowPriority; + case AdmissionContext::Priority::kNormal: + return QueueType::kNormalPriority; + case AdmissionContext::Priority::kImmediate: + return QueueType::kImmediatePriority; + default: + MONGO_UNREACHABLE; + } + } - QueueMutex _queueMutex; + ticket_queues::QueueMutex _queueMutex; + std::array<ticket_queues::Queue, static_cast<unsigned int>(QueueType::QueueTypeSize)> _queues; + std::array<QueueStats, static_cast<unsigned int>(QueueType::QueueTypeSize)> _stats; /** * Limits the number times the low priority queue is non-empty and bypassed in favor of the * normal priority queue for the next ticket admission. * - * Updates must be done under the UniqueLockGuard. + * Updates must be done under the ticket_queues::UniqueLockGuard. */ int _lowPriorityBypassThreshold; |