diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-09-23 09:39:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-23 10:37:17 +0000 |
commit | 509d5d5bedab5c2b52df7b4e6458ef2a1cd607b3 (patch) | |
tree | b1d2eed62e605f50e00c72d2c635952d0d31b971 /src | |
parent | 2810900e2b04bce2170b2def8cfdcc22512e65f8 (diff) | |
download | mongo-509d5d5bedab5c2b52df7b4e6458ef2a1cd607b3.tar.gz |
SERVER-69894 Combine PriorityTicketHolder and SchedulingTicketHolder
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 106 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 84 |
2 files changed, 80 insertions, 110 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index d910f292129..554a5f1c888 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -67,6 +67,28 @@ void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext, queueStats.totalStartedProcessing.fetchAndAddRelaxed(1); } +/** + * Appends the standard statistics stored in QueueStats to BSONObjBuilder b; + */ +void appendCommonQueueImplStats(BSONObjBuilder& b, + const TicketHolderWithQueueingStats::QueueStats& stats) { + auto removed = stats.totalRemovedQueue.loadRelaxed(); + auto added = stats.totalAddedQueue.loadRelaxed(); + + b.append("addedToQueue", added); + b.append("removedFromQueue", removed); + b.append("queueLength", std::max(static_cast<int>(added - removed), 0)); + + auto finished = stats.totalFinishedProcessing.loadRelaxed(); + auto started = stats.totalStartedProcessing.loadRelaxed(); + b.append("startedProcessing", started); + b.append("processing", std::max(static_cast<int>(started - finished), 0)); + b.append("finishedProcessing", finished); + b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed()); + b.append("canceled", stats.totalCanceled.loadRelaxed()); + b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed()); + b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed()); +} } // namespace TicketHolder* TicketHolder::get(ServiceContext* svcCtx) { @@ -283,20 +305,7 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat } void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const { - auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed(); - auto added = _semaphoreStats.totalAddedQueue.loadRelaxed(); - b.append("addedToQueue", added); - b.append("removedFromQueue", removed); - b.append("queueLength", std::max(static_cast<int>(added - removed), 0)); - auto finished = _semaphoreStats.totalFinishedProcessing.loadRelaxed(); - auto started = _semaphoreStats.totalStartedProcessing.loadRelaxed(); - b.append("startedProcessing", started); - b.append("processing", std::max(static_cast<int>(started - finished), 0)); - b.append("finishedProcessing", finished); - b.append("totalTimeProcessingMicros", _semaphoreStats.totalTimeProcessingMicros.loadRelaxed()); - b.append("canceled", _semaphoreStats.totalCanceled.loadRelaxed()); - b.append("newAdmissions", _semaphoreStats.totalNewAdmissions.loadRelaxed()); - b.append("totalTimeQueuedMicros", _semaphoreStats.totalTimeQueuedMicros.loadRelaxed()); + appendCommonQueueImplStats(b, _semaphoreStats); } #if defined(__linux__) namespace { @@ -486,11 +495,9 @@ void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { } #endif -SchedulingTicketHolder::SchedulingTicketHolder(int numTickets, - unsigned int numQueues, - ServiceContext* serviceContext) +PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext) : TicketHolderWithQueueingStats(numTickets, serviceContext), _serviceContext(serviceContext) { - for (std::size_t i = 0; i < numQueues; i++) { + for (std::size_t i = 0; i < static_cast<unsigned int>(QueueType::QueueTypeSize); i++) { _queues.emplace_back(this); } _queues.shrink_to_fit(); @@ -498,17 +505,17 @@ SchedulingTicketHolder::SchedulingTicketHolder(int numTickets, _enqueuedElements.store(0); } -SchedulingTicketHolder::~SchedulingTicketHolder() {} +PriorityTicketHolder::~PriorityTicketHolder() {} -int SchedulingTicketHolder::available() const { +int PriorityTicketHolder::available() const { return _ticketsAvailable.load(); } -int SchedulingTicketHolder::queued() const { +int PriorityTicketHolder::queued() const { return _enqueuedElements.loadRelaxed(); } -void SchedulingTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { +void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { // Tickets acquired with priority kImmediate are not generated from the pool of available // tickets, and thus should never be returned to the pool of available tickets. invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); @@ -533,7 +540,7 @@ void SchedulingTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) _dequeueWaitingThread(); } -bool SchedulingTicketHolder::_tryAcquireTicket() { +bool PriorityTicketHolder::_tryAcquireTicket() { auto remaining = _ticketsAvailable.subtractAndFetch(1); if (remaining < 0) { _ticketsAvailable.addAndFetch(1); @@ -542,7 +549,7 @@ bool SchedulingTicketHolder::_tryAcquireTicket() { return true; } -boost::optional<Ticket> SchedulingTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { +boost::optional<Ticket> PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { invariant(admCtx); auto hasAcquired = _tryAcquireTicket(); @@ -552,10 +559,10 @@ boost::optional<Ticket> SchedulingTicketHolder::_tryAcquireImpl(AdmissionContext return boost::none; } -boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) { +boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode) { invariant(admCtx); auto& queue = _getQueueToUse(admCtx); @@ -574,7 +581,7 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio } } -void SchedulingTicketHolder::_resize(int newSize, int oldSize) noexcept { +void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept { auto difference = newSize - oldSize; _ticketsAvailable.fetchAndAdd(difference); @@ -592,7 +599,7 @@ void SchedulingTicketHolder::_resize(int newSize, int oldSize) noexcept { // have to wait until the current ticket holders release their tickets. } -bool SchedulingTicketHolder::Queue::attemptToDequeue() { +bool PriorityTicketHolder::Queue::attemptToDequeue() { auto threadsToBeWoken = _threadsToBeWoken.load(); while (threadsToBeWoken < _queuedThreads) { auto canDequeue = _threadsToBeWoken.compareAndSwap(&threadsToBeWoken, threadsToBeWoken + 1); @@ -604,7 +611,7 @@ bool SchedulingTicketHolder::Queue::attemptToDequeue() { return false; } -void SchedulingTicketHolder::Queue::_signalThreadWoken() { +void PriorityTicketHolder::Queue::_signalThreadWoken() { auto currentThreadsToBeWoken = _threadsToBeWoken.load(); while (currentThreadsToBeWoken > 0) { if (_threadsToBeWoken.compareAndSwap(¤tThreadsToBeWoken, @@ -614,10 +621,10 @@ void SchedulingTicketHolder::Queue::_signalThreadWoken() { } } -bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx, - EnqueuerLockGuard& queueLock, - const Date_t& until, - WaitMode waitMode) { +bool PriorityTicketHolder::Queue::enqueue(OperationContext* opCtx, + EnqueuerLockGuard& queueLock, + const Date_t& until, + WaitMode waitMode) { _queuedThreads++; // Before exiting we remove ourselves from the count of queued threads, we are still holding the // lock here so this is safe. @@ -653,9 +660,6 @@ bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx, return true; } -PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext) - : SchedulingTicketHolder(numTickets, 3, serviceContext) {} - void PriorityTicketHolder::_dequeueWaitingThread() { // There should never be anything to dequeue from 'QueueType::ImmediatePriorityNoOpQueue' since // 'kImmediate' operations should always bypass the need to queue. @@ -673,14 +677,14 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { BSONObjBuilder bbb(b.subobjStart("lowPriority")); auto& lowPriorityTicketStats = _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)].getStats(); - _appendPriorityStats(bbb, lowPriorityTicketStats); + appendCommonQueueImplStats(bbb, lowPriorityTicketStats); bbb.done(); } { BSONObjBuilder bbb(b.subobjStart("normalPriority")); auto& normalPriorityTicketStats = _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)].getStats(); - _appendPriorityStats(bbb, normalPriorityTicketStats); + appendCommonQueueImplStats(bbb, normalPriorityTicketStats); bbb.done(); } { @@ -702,27 +706,7 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { } } -void PriorityTicketHolder::_appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const { - auto removed = stats.totalRemovedQueue.loadRelaxed(); - auto added = stats.totalAddedQueue.loadRelaxed(); - - b.append("addedToQueue", added); - b.append("removedFromQueue", removed); - b.append("queueLength", std::max(static_cast<int>(added - removed), 0)); - - auto finished = stats.totalFinishedProcessing.loadRelaxed(); - auto started = stats.totalStartedProcessing.loadRelaxed(); - b.append("startedProcessing", started); - b.append("processing", std::max(static_cast<int>(started - finished), 0)); - b.append("finishedProcessing", finished); - b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed()); - b.append("canceled", stats.totalCanceled.loadRelaxed()); - b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed()); - b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed()); -} - -SchedulingTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse( - const AdmissionContext* admCtx) noexcept { +PriorityTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse(const AdmissionContext* admCtx) { auto priority = admCtx->getPriority(); switch (priority) { case AdmissionContext::Priority::kLow: diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 60f5d167769..65cfddb753a 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -324,7 +324,21 @@ private: * Waiters will get placed in a specific internal queue according to some logic. * Releasers will wake up a waiter from a group chosen according to some logic. */ -class SchedulingTicketHolder : public TicketHolderWithQueueingStats { +class PriorityTicketHolder : public TicketHolderWithQueueingStats { +protected: +public: + explicit PriorityTicketHolder(int numTickets, ServiceContext* serviceContext); + ~PriorityTicketHolder() override; + + int available() const override final; + + int queued() const override final; + + bool recordImmediateTicketStatistics() noexcept override final { + return true; + }; + +private: // Using a shared_mutex is fine here because usual considerations for avoiding them do not apply // in this case: // * Operations are short and do not block while holding the lock (i.e. they only do CPU-bound @@ -338,10 +352,10 @@ class SchedulingTicketHolder : public TicketHolderWithQueueingStats { using QueueMutex = std::shared_mutex; // NOLINT using ReleaserLockGuard = std::shared_lock<QueueMutex>; // NOLINT using EnqueuerLockGuard = std::unique_lock<QueueMutex>; // NOLINT -protected: + class Queue { public: - Queue(SchedulingTicketHolder* holder) : _holder(holder){}; + Queue(PriorityTicketHolder* holder) : _holder(holder){}; Queue(Queue&& other) : _queuedThreads(other._queuedThreads), @@ -378,28 +392,19 @@ protected: int _queuedThreads{0}; AtomicWord<int> _threadsToBeWoken{0}; stdx::condition_variable _cv; - SchedulingTicketHolder* _holder; + PriorityTicketHolder* _holder; QueueStats _stats; }; - std::vector<Queue> _queues; - -public: - explicit SchedulingTicketHolder(int numTickets, - unsigned int numQueues, - ServiceContext* serviceContext); - ~SchedulingTicketHolder() override; - - int available() const override final; - - int queued() const override final; - - bool recordImmediateTicketStatistics() noexcept override final { - return true; + enum class QueueType : unsigned int { + LowPriorityQueue = 0, + NormalPriorityQueue = 1, + // Exclusively used for statistics tracking. This queue should never have any processes + // 'queued'. + ImmediatePriorityNoOpQueue = 2, + QueueTypeSize = 3 }; -private: - bool _tryAcquireTicket(); boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final; @@ -412,7 +417,11 @@ private: void _resize(int newSize, int oldSize) noexcept override final; - QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override = 0; + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final; + + void _appendImplStats(BSONObjBuilder& b) const override final; + + bool _tryAcquireTicket(); /** * Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket. @@ -426,14 +435,14 @@ private: * - The number of items in each queue will not change during the execution * - No other thread will proceed to wait during the execution of the method */ - virtual void _dequeueWaitingThread() = 0; - - void _appendImplStats(BSONObjBuilder& b) const override = 0; + void _dequeueWaitingThread(); /** * Selects the queue to use for the current thread given the provided arguments. */ - virtual Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept = 0; + Queue& _getQueueToUse(const AdmissionContext* admCtx); + + std::vector<Queue> _queues; QueueMutex _queueMutex; AtomicWord<int> _ticketsAvailable; @@ -441,29 +450,6 @@ private: ServiceContext* _serviceContext; }; -class PriorityTicketHolder final : public SchedulingTicketHolder { -public: - explicit PriorityTicketHolder(int numTickets, ServiceContext* serviceContext); - -private: - enum class QueueType : unsigned int { - LowPriorityQueue = 0, - NormalPriorityQueue = 1, - // Exclusively used for statistics tracking. This queue should never have any processes - // 'queued'. - ImmediatePriorityNoOpQueue = 2, - QueueTypeSize = 3 - }; - - void _dequeueWaitingThread() override final; - QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final; - - void _appendImplStats(BSONObjBuilder& b) const override final; - void _appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const; - - Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept override final; -}; - /** * RAII-style movable token that gets generated when a ticket is acquired and is automatically * released when going out of scope. @@ -473,7 +459,7 @@ class Ticket { friend class ReaderWriterTicketHolder; friend class TicketHolderWithQueueingStats; friend class SemaphoreTicketHolder; - friend class SchedulingTicketHolder; + friend class PriorityTicketHolder; public: Ticket(Ticket&& t) : _ticketholder(t._ticketholder), _admissionContext(t._admissionContext) { |