summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/ticketholder.h
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-11-08 10:37:02 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-08 11:10:15 +0000
commit25329194fe4343fd4ef1f3423377da742a47d5d6 (patch)
treea80644d0d7faf7172fb64c7dfa850dfe4728249b /src/mongo/util/concurrency/ticketholder.h
parent5e1d12241178f6c6520d57476552b362b5ddf237 (diff)
downloadmongo-25329194fe4343fd4ef1f3423377da742a47d5d6.tar.gz
SERVER-70927 Make PriorityTicketHolder select where to dequeue
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.h')
-rw-r--r--src/mongo/util/concurrency/ticketholder.h71
1 files changed, 39 insertions, 32 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 834aa28f1fe..3b0153d1a85 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -284,7 +284,9 @@ private:
class PriorityTicketHolder : public TicketHolderWithQueueingStats {
protected:
public:
- explicit PriorityTicketHolder(int numTickets, ServiceContext* serviceContext);
+ explicit PriorityTicketHolder(int numTickets,
+ int lowPriorityBypassThreshold,
+ ServiceContext* serviceContext);
~PriorityTicketHolder() override;
int available() const override final;
@@ -295,7 +297,13 @@ public:
return true;
};
- std::int64_t promoted() const;
+ void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold);
+
+ /**
+ * Number of times low priority operations are expedited for ticket admission over normal
+ * priority operations.
+ */
+ std::int64_t expedited() const;
/**
* Returns the number of times the low priority queue is bypassed in favor of dequeuing from the
@@ -314,9 +322,9 @@ private:
//
// The alternative of using ResourceMutex is not appropriate as the class serves as a
// concurrency primitive and is performance sensitive.
- using QueueMutex = std::shared_mutex; // NOLINT
- using ReleaserLockGuard = std::shared_lock<QueueMutex>; // NOLINT
- using EnqueuerLockGuard = std::unique_lock<QueueMutex>; // NOLINT
+ using QueueMutex = std::shared_mutex; // NOLINT
+ using SharedLockGuard = std::shared_lock<QueueMutex>; // NOLINT
+ using UniqueLockGuard = std::unique_lock<QueueMutex>; // NOLINT
enum class QueueType : unsigned int {
kLowPriority = 0,
@@ -327,39 +335,27 @@ private:
QueueTypeSize = 3
};
- enum class AdmissionStatus {
- // The ticket request is successful, and the operation is ready to acquire a ticket.
- kReadyToAcquire,
- // There are no tickets available for the request at its given priority. The priority should
- // be increased to allow for ticket acquisition.
- kNeedsPromotion,
- // An interruption occured while attempting to acquire a ticket.
- kInterrupted,
- };
-
class Queue {
public:
Queue(PriorityTicketHolder* holder, QueueType queueType)
: _holder(holder), _queueType(queueType){};
- bool attemptToDequeue(const ReleaserLockGuard& releaserLock);
+ bool attemptToDequeue(const SharedLockGuard& sharedQueueLock);
- AdmissionStatus enqueue(OperationContext* interruptible,
- EnqueuerLockGuard& queueLock,
- const Date_t& until,
- WaitMode waitMode);
+ /**
+ * Returns true if this operation is assigned a ticket, false if the deadline is exceeded
+ * before ticket acquisition. Throws if the operation is interrupted.
+ */
+ bool enqueue(OperationContext* interruptible,
+ UniqueLockGuard& queueLock,
+ const Date_t& until,
+ WaitMode waitMode);
int queuedElems() const {
return _queuedThreads;
}
/**
- * Signals that a queued thread should be woken and exit the queue so it can be promoted to
- * a higher priority queue.
- */
- void signalPromoteSingleOp(const ReleaserLockGuard& releaserLock);
-
- /**
* Returns a reference to the Queue statistics that allows callers to update the statistics.
*/
QueueStats& getStatsToUse() {
@@ -374,11 +370,10 @@ private:
}
private:
- void _signalThreadWoken(const EnqueuerLockGuard& enqueuerLock);
+ void _signalThreadWoken(const UniqueLockGuard& uniqueQueueLock);
int _queuedThreads{0};
AtomicWord<int> _threadsToBeWoken{0};
- AtomicWord<bool> _signalPromotion{false};
stdx::condition_variable _cv;
PriorityTicketHolder* _holder;
@@ -416,13 +411,13 @@ private:
* - The number of items in each queue will not change during the execution
* - No other thread will proceed to wait during the execution of the method
*/
- void _dequeueWaitingThread(const ReleaserLockGuard& releaserLock);
+ void _dequeueWaitingThread(const SharedLockGuard& sharedQueueLock);
/**
* Returns whether there are higher priority threads pending to get a ticket in front of the
* given queue type and not enough tickets for all of them.
*/
- bool _hasToWaitForHigherPriority(const EnqueuerLockGuard& lk, QueueType queue);
+ bool _hasToWaitForHigherPriority(const UniqueLockGuard& lk, QueueType queue);
QueueType _queueType(const AdmissionContext* admCtx);
@@ -434,13 +429,25 @@ private:
QueueMutex _queueMutex;
/**
+ * Limits the number times the low priority queue is non-empty and bypassed in favor of the
+ * normal priority queue for the next ticket admission.
+ *
+ * Updates must be done under the UniqueLockGuard.
+ */
+ int _lowPriorityBypassThreshold;
+
+ /**
* Counts the number of times normal operations are dequeued over operations queued in the low
* priority queue.
*/
- AtomicWord<std::int64_t> _lowPriorityBypassCount{0};
+ AtomicWord<std::uint64_t> _lowPriorityBypassCount{0};
+
+ /**
+ * Number of times ticket admission is expedited for low priority operations.
+ */
+ AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0};
AtomicWord<int> _ticketsAvailable;
AtomicWord<int> _enqueuedElements;
- AtomicWord<std::int64_t> _promotedElements{0};
ServiceContext* _serviceContext;
};