summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/ticketholder.h
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-11-11 14:18:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-11 15:12:03 +0000
commit977cf573cb8e924390e6f9108a1f9d28caf02199 (patch)
treedd46f4799214082e46e20285f8f5447ee4d6608d /src/mongo/util/concurrency/ticketholder.h
parent4802a8855efac9a9d31f0a3c47b3594df5833724 (diff)
downloadmongo-977cf573cb8e924390e6f9108a1f9d28caf02199.tar.gz
SERVER-70792 Separate Queue from PriorityTicketHolder
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.h')
-rw-r--r--src/mongo/util/concurrency/ticketholder.h128
1 files changed, 48 insertions, 80 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 3b0153d1a85..a87dacca15f 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -41,6 +41,7 @@
#include "mongo/stdx/future.h"
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/mutex.h"
+#include "mongo/util/concurrency/ticket_queues.h"
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/time_support.h"
@@ -277,55 +278,48 @@ private:
};
/**
- * A ticketholder implementation that centralises all ticket acquisition/releases.
- * Waiters will get placed in a specific internal queue according to some logic.
- * Releasers will wake up a waiter from a group chosen according to some logic.
+ * A ticketholder implementation that centralises all ticket acquisition/releases. Waiters will get
+ * placed in a specific internal queue according to some logic. Releasers will wake up a waiter
+ * from a group chosen according to some logic.
*/
class PriorityTicketHolder : public TicketHolderWithQueueingStats {
-protected:
public:
explicit PriorityTicketHolder(int numTickets,
int lowPriorityBypassThreshold,
ServiceContext* serviceContext);
- ~PriorityTicketHolder() override;
+ ~PriorityTicketHolder() override{};
- int available() const override final;
+ int available() const override final {
+ return _ticketsAvailable.load();
+ };
- int queued() const override final;
+ int queued() const override final {
+ return _enqueuedElements.loadRelaxed();
+ }
bool recordImmediateTicketStatistics() noexcept override final {
return true;
};
- void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold);
-
/**
* Number of times low priority operations are expedited for ticket admission over normal
* priority operations.
*/
- std::int64_t expedited() const;
+ std::int64_t expedited() const {
+ return _expeditedLowPriorityAdmissions.loadRelaxed();
+ }
/**
* Returns the number of times the low priority queue is bypassed in favor of dequeuing from the
* normal priority queue when a ticket becomes available.
*/
- std::int64_t bypassed() const;
+ std::int64_t bypassed() const {
+ return _lowPriorityBypassCount.loadRelaxed();
+ };
-private:
- // Using a shared_mutex is fine here because usual considerations for avoiding them do not apply
- // in this case:
- // * Operations are short and do not block while holding the lock (i.e. they only do CPU-bound
- // work)
- // * Writer starvation is not possible as there are a finite number of operations to be
- // performed in the reader case. Once all tickets get released no other thread can take the
- // shared lock.
- //
- // The alternative of using ResourceMutex is not appropriate as the class serves as a
- // concurrency primitive and is performance sensitive.
- using QueueMutex = std::shared_mutex; // NOLINT
- using SharedLockGuard = std::shared_lock<QueueMutex>; // NOLINT
- using UniqueLockGuard = std::unique_lock<QueueMutex>; // NOLINT
+ void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold);
+private:
enum class QueueType : unsigned int {
kLowPriority = 0,
kNormalPriority = 1,
@@ -335,53 +329,6 @@ private:
QueueTypeSize = 3
};
- class Queue {
- public:
- Queue(PriorityTicketHolder* holder, QueueType queueType)
- : _holder(holder), _queueType(queueType){};
-
- bool attemptToDequeue(const SharedLockGuard& sharedQueueLock);
-
- /**
- * Returns true if this operation is assigned a ticket, false if the deadline is exceeded
- * before ticket acquisition. Throws if the operation is interrupted.
- */
- bool enqueue(OperationContext* interruptible,
- UniqueLockGuard& queueLock,
- const Date_t& until,
- WaitMode waitMode);
-
- int queuedElems() const {
- return _queuedThreads;
- }
-
- /**
- * Returns a reference to the Queue statistics that allows callers to update the statistics.
- */
- QueueStats& getStatsToUse() {
- return _stats;
- }
- const QueueStats& getStatsToUse() const {
- return _stats;
- }
-
- int getThreadsPendingToWake() const {
- return _threadsToBeWoken.load();
- }
-
- private:
- void _signalThreadWoken(const UniqueLockGuard& uniqueQueueLock);
-
- int _queuedThreads{0};
- AtomicWord<int> _threadsToBeWoken{0};
-
- stdx::condition_variable _cv;
- PriorityTicketHolder* _holder;
- QueueStats _stats;
- const QueueType _queueType;
- };
-
-
boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
@@ -411,28 +358,49 @@ private:
* - The number of items in each queue will not change during the execution
* - No other thread will proceed to wait during the execution of the method
*/
- void _dequeueWaitingThread(const SharedLockGuard& sharedQueueLock);
+ void _dequeueWaitingThread(const ticket_queues::SharedLockGuard& sharedQueueLock);
/**
* Returns whether there are higher priority threads pending to get a ticket in front of the
* given queue type and not enough tickets for all of them.
*/
- bool _hasToWaitForHigherPriority(const UniqueLockGuard& lk, QueueType queue);
+ bool _hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, QueueType queueType);
+
+ unsigned int _enumToInt(QueueType queueType) {
+ return static_cast<unsigned int>(queueType);
+ }
+ unsigned int _enumToInt(QueueType queueType) const {
+ return static_cast<unsigned int>(queueType);
+ }
- QueueType _queueType(const AdmissionContext* admCtx);
+ ticket_queues::Queue& _getQueue(QueueType queueType) {
+ return _queues[_enumToInt(queueType)];
+ }
- Queue& _getQueue(QueueType queueType);
- const Queue& _getQueue(QueueType queueType) const;
- std::array<Queue, static_cast<unsigned int>(QueueType::QueueTypeSize)> _queues;
+ QueueType _getQueueType(const AdmissionContext* admCtx) {
+ auto priority = admCtx->getPriority();
+ switch (priority) {
+ case AdmissionContext::Priority::kLow:
+ return QueueType::kLowPriority;
+ case AdmissionContext::Priority::kNormal:
+ return QueueType::kNormalPriority;
+ case AdmissionContext::Priority::kImmediate:
+ return QueueType::kImmediatePriority;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
- QueueMutex _queueMutex;
+ ticket_queues::QueueMutex _queueMutex;
+ std::array<ticket_queues::Queue, static_cast<unsigned int>(QueueType::QueueTypeSize)> _queues;
+ std::array<QueueStats, static_cast<unsigned int>(QueueType::QueueTypeSize)> _stats;
/**
* Limits the number times the low priority queue is non-empty and bypassed in favor of the
* normal priority queue for the next ticket admission.
*
- * Updates must be done under the UniqueLockGuard.
+ * Updates must be done under the ticket_queues::UniqueLockGuard.
*/
int _lowPriorityBypassThreshold;