summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/ticketholder.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.h')
-rw-r--r--src/mongo/util/concurrency/ticketholder.h200
1 files changed, 4 insertions, 196 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index a87dacca15f..5d16ef1ae25 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -28,12 +28,6 @@
*/
#pragma once
-#if defined(__linux__)
-#include <semaphore.h>
-#endif
-
-#include <queue>
-
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/mutex.h"
@@ -41,13 +35,14 @@
#include "mongo/stdx/future.h"
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/mutex.h"
-#include "mongo/util/concurrency/ticket_queues.h"
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/time_support.h"
namespace mongo {
class Ticket;
+class PriorityTicketHolder;
+class SemaphoreTicketHolder;
/**
* Maintains and distributes tickets across operations from a limited pool of tickets. The ticketing
@@ -224,198 +219,11 @@ private:
AtomicWord<int> _outof;
protected:
- ServiceContext* _serviceContext;
-};
-
-class SemaphoreTicketHolder final : public TicketHolderWithQueueingStats {
-public:
- explicit SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext);
- ~SemaphoreTicketHolder() override final;
-
- int available() const override final;
-
- int queued() const override final {
- auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed();
- auto added = _semaphoreStats.totalAddedQueue.loadRelaxed();
- return std::max(static_cast<int>(added - removed), 0);
- };
-
- bool recordImmediateTicketStatistics() noexcept override final {
- // Historically, operations that now acquire 'immediate' tickets bypassed the ticketing
- // mechanism completely. Preserve legacy behavior where 'immediate' ticketing is not tracked
- // in the statistics.
- return false;
- }
-
-private:
- boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until,
- WaitMode waitMode) override final;
-
- boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
- void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final;
-
- void _appendImplStats(BSONObjBuilder& b) const override final;
-
- void _resize(int newSize, int oldSize) noexcept override final;
-
- QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final {
- return _semaphoreStats;
- }
-#if defined(__linux__)
- mutable sem_t _sem;
-
-#else
- bool _tryAcquire();
-
- int _numTickets;
- Mutex _mutex =
- MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex");
- stdx::condition_variable _newTicket;
-#endif
- QueueStats _semaphoreStats;
-};
-
-/**
- * A ticketholder implementation that centralises all ticket acquisition/releases. Waiters will get
- * placed in a specific internal queue according to some logic. Releasers will wake up a waiter
- * from a group chosen according to some logic.
- */
-class PriorityTicketHolder : public TicketHolderWithQueueingStats {
-public:
- explicit PriorityTicketHolder(int numTickets,
- int lowPriorityBypassThreshold,
- ServiceContext* serviceContext);
- ~PriorityTicketHolder() override{};
-
- int available() const override final {
- return _ticketsAvailable.load();
- };
-
- int queued() const override final {
- return _enqueuedElements.loadRelaxed();
- }
-
- bool recordImmediateTicketStatistics() noexcept override final {
- return true;
- };
-
/**
- * Number of times low priority operations are expedited for ticket admission over normal
- * priority operations.
+ * Appends the standard statistics stored in QueueStats to BSONObjBuilder b;
*/
- std::int64_t expedited() const {
- return _expeditedLowPriorityAdmissions.loadRelaxed();
- }
+ void _appendCommonQueueImplStats(BSONObjBuilder& b, const QueueStats& stats) const;
- /**
- * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the
- * normal priority queue when a ticket becomes available.
- */
- std::int64_t bypassed() const {
- return _lowPriorityBypassCount.loadRelaxed();
- };
-
- void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold);
-
-private:
- enum class QueueType : unsigned int {
- kLowPriority = 0,
- kNormalPriority = 1,
- // Exclusively used for statistics tracking. This queue should never have any processes
- // 'queued'.
- kImmediatePriority = 2,
- QueueTypeSize = 3
- };
-
- boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
-
- boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until,
- WaitMode waitMode) override final;
-
- void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final;
-
- void _resize(int newSize, int oldSize) noexcept override final;
-
- QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final;
-
- void _appendImplStats(BSONObjBuilder& b) const override final;
-
- bool _tryAcquireTicket();
-
- /**
- * Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket.
- * Implementors MUST wake at least one waiting thread if at least one thread is pending to be
- * woken between all the queues. In other words, attemptToDequeue on each non-empty Queue must
- * be called until either it returns true at least once or has been called on all queues.
- *
- * Care must be taken to ensure that only CPU-bound work is performed here and it doesn't block.
- *
- * When called the following invariants will be held:
- * - The number of items in each queue will not change during the execution
- * - No other thread will proceed to wait during the execution of the method
- */
- void _dequeueWaitingThread(const ticket_queues::SharedLockGuard& sharedQueueLock);
-
- /**
- * Returns whether there are higher priority threads pending to get a ticket in front of the
- * given queue type and not enough tickets for all of them.
- */
- bool _hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, QueueType queueType);
-
- unsigned int _enumToInt(QueueType queueType) {
- return static_cast<unsigned int>(queueType);
- }
- unsigned int _enumToInt(QueueType queueType) const {
- return static_cast<unsigned int>(queueType);
- }
-
- ticket_queues::Queue& _getQueue(QueueType queueType) {
- return _queues[_enumToInt(queueType)];
- }
-
-
- QueueType _getQueueType(const AdmissionContext* admCtx) {
- auto priority = admCtx->getPriority();
- switch (priority) {
- case AdmissionContext::Priority::kLow:
- return QueueType::kLowPriority;
- case AdmissionContext::Priority::kNormal:
- return QueueType::kNormalPriority;
- case AdmissionContext::Priority::kImmediate:
- return QueueType::kImmediatePriority;
- default:
- MONGO_UNREACHABLE;
- }
- }
-
- ticket_queues::QueueMutex _queueMutex;
- std::array<ticket_queues::Queue, static_cast<unsigned int>(QueueType::QueueTypeSize)> _queues;
- std::array<QueueStats, static_cast<unsigned int>(QueueType::QueueTypeSize)> _stats;
-
- /**
- * Limits the number times the low priority queue is non-empty and bypassed in favor of the
- * normal priority queue for the next ticket admission.
- *
- * Updates must be done under the ticket_queues::UniqueLockGuard.
- */
- int _lowPriorityBypassThreshold;
-
- /**
- * Counts the number of times normal operations are dequeued over operations queued in the low
- * priority queue.
- */
- AtomicWord<std::uint64_t> _lowPriorityBypassCount{0};
-
- /**
- * Number of times ticket admission is expedited for low priority operations.
- */
- AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0};
- AtomicWord<int> _ticketsAvailable;
- AtomicWord<int> _enqueuedElements;
ServiceContext* _serviceContext;
};