From 059bcbe2df11d12249e53263f28dd12dbc185f79 Mon Sep 17 00:00:00 2001 From: Louis Williams Date: Fri, 28 Apr 2023 14:21:36 +0000 Subject: SERVER-76653 PriorityTicketholder perf improvements --- .../util/concurrency/priority_ticketholder.cpp | 32 +++--- src/mongo/util/concurrency/priority_ticketholder.h | 89 +-------------- src/mongo/util/concurrency/ticket_pool.cpp | 40 +++---- src/mongo/util/concurrency/ticket_pool.h | 124 +++++++++++++++++---- src/mongo/util/concurrency/ticket_pool_test.cpp | 8 +- 5 files changed, 142 insertions(+), 151 deletions(-) (limited to 'src/mongo') diff --git a/src/mongo/util/concurrency/priority_ticketholder.cpp b/src/mongo/util/concurrency/priority_ticketholder.cpp index aa8e581fcf2..4656c7ab4f4 100644 --- a/src/mongo/util/concurrency/priority_ticketholder.cpp +++ b/src/mongo/util/concurrency/priority_ticketholder.cpp @@ -45,18 +45,16 @@ namespace mongo { PriorityTicketHolder::PriorityTicketHolder(int32_t numTickets, int32_t lowPriorityBypassThreshold, ServiceContext* serviceContext) - : TicketHolder(numTickets, serviceContext), _serviceContext(serviceContext) { - - auto queue = std::make_unique(lowPriorityBypassThreshold); - _pool = std::make_unique(numTickets, std::move(queue)); -} + : TicketHolder(numTickets, serviceContext), + _serviceContext(serviceContext), + _pool(numTickets, lowPriorityBypassThreshold) {} int32_t PriorityTicketHolder::available() const { - return _pool->available(); + return _pool.available(); } int64_t PriorityTicketHolder::queued() const { - return _pool->queued(); + return _pool.queued(); } int64_t PriorityTicketHolder::numFinishedProcessing() const { @@ -65,22 +63,22 @@ int64_t PriorityTicketHolder::numFinishedProcessing() const { } int64_t PriorityTicketHolder::expedited() const { - return static_cast(_pool->getQueue())->expedited(); + return _pool.getQueue().expedited(); } int64_t PriorityTicketHolder::bypassed() const { - return static_cast(_pool->getQueue())->bypassed(); + return _pool.getQueue().bypassed(); } void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) { - auto queue = static_cast(_pool->getQueue()); - queue->updateLowPriorityAdmissionBypassThreshold(newBypassThreshold); + auto& queue = _pool.getQueue(); + queue.updateLowPriorityAdmissionBypassThreshold(newBypassThreshold); } boost::optional PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { invariant(admCtx); - if (_pool->tryAcquire()) { + if (_pool.tryAcquire()) { return Ticket(this, admCtx); } @@ -97,11 +95,11 @@ boost::optional PriorityTicketHolder::_waitForTicketUntilImpl(OperationC // maximum period of time. This may or may not succeed, in which case we will retry until // timing out or getting interrupted. auto maxUntil = std::min(until, Date_t::now() + Milliseconds(500)); - auto acquired = _pool->acquire(admCtx, maxUntil); + auto acquired = _pool.acquire(admCtx, maxUntil); ScopeGuard rereleaseIfTimedOutOrInterrupted([&] { // We may have gotten a ticket that we can't use, release it back to the ticket pool. if (acquired) { - _pool->release(); + _pool.release(); } }); @@ -125,7 +123,7 @@ boost::optional PriorityTicketHolder::_waitForTicketUntilImpl(OperationC void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { // 'Immediate' priority operations should bypass the ticketing system completely. invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); - _pool->release(); + _pool.release(); } void PriorityTicketHolder::_resize(int32_t newSize, int32_t oldSize) noexcept { @@ -134,13 +132,13 @@ void PriorityTicketHolder::_resize(int32_t newSize, int32_t oldSize) noexcept { if (difference > 0) { // Hand out tickets one-by-one until we've given them all out. for (auto remaining = difference; remaining > 0; remaining--) { - _pool->release(); + _pool.release(); } } else { AdmissionContext admCtx; // Take tickets one-by-one without releasing. for (auto remaining = -difference; remaining > 0; remaining--) { - _pool->acquire(&admCtx, Date_t::max()); + _pool.acquire(&admCtx, Date_t::max()); } } } diff --git a/src/mongo/util/concurrency/priority_ticketholder.h b/src/mongo/util/concurrency/priority_ticketholder.h index 54c07e640b1..8d36e930758 100644 --- a/src/mongo/util/concurrency/priority_ticketholder.h +++ b/src/mongo/util/concurrency/priority_ticketholder.h @@ -50,91 +50,6 @@ enum class QueueType : unsigned int { kLowPriority = 0, kNormalPriority = 1, Num class Ticket; -/** - * This SimplePriorityTicketQueue implements a queue policy that separates normal and low priority - * operations into separate queues. Normal priority operations are always scheduled ahead of low - * priority ones, except when a positive lowPriorityBypassThreshold is provided. This parameter - * specifies how often a waiting low-priority operation should skip the queue and be scheduled ahead - * of waiting normal priority operations. - */ -class SimplePriorityTicketQueue : public TicketQueue { -public: - SimplePriorityTicketQueue(int lowPriorityBypassThreshold) - : _lowPriorityBypassThreshold(lowPriorityBypassThreshold) {} - - bool empty() const final { - return _normal.empty() && _low.empty(); - } - - void push(std::shared_ptr val) final { - if (val->context->getPriority() == AdmissionContext::Priority::kLow) { - _low.push(std::move(val)); - return; - } - invariant(val->context->getPriority() == AdmissionContext::Priority::kNormal); - _normal.push(std::move(val)); - } - - std::shared_ptr pop() final { - if (!_normal.empty() && !_low.empty() && _lowPriorityBypassThreshold.load() > 0 && - _lowPriorityBypassCount.fetchAndAdd(1) % _lowPriorityBypassThreshold.load() == 0) { - auto front = std::move(_low.front()); - _low.pop(); - _expeditedLowPriorityAdmissions.addAndFetch(1); - return front; - } - if (!_normal.empty()) { - auto front = std::move(_normal.front()); - _normal.pop(); - return front; - } - auto front = std::move(_low.front()); - _low.pop(); - return front; - } - - /** - * Number of times low priority operations are expedited for ticket admission over normal - * priority operations. - */ - std::int64_t expedited() const { - return _expeditedLowPriorityAdmissions.loadRelaxed(); - } - - /** - * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the - * normal priority queue when a ticket becomes available. - */ - std::int64_t bypassed() const { - return _lowPriorityBypassCount.loadRelaxed(); - } - - void updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) { - _lowPriorityBypassThreshold.store(newBypassThreshold); - } - -private: - /** - * Limits the number times the low priority queue is non-empty and bypassed in favor of the - * normal priority queue for the next ticket admission. - */ - AtomicWord _lowPriorityBypassThreshold; - - /** - * Number of times ticket admission is expedited for low priority operations. - */ - AtomicWord _expeditedLowPriorityAdmissions{0}; - - /** - * Counts the number of times normal operations are dequeued over operations queued in the low - * priority queue. We explicitly use an unsigned type here because rollover is desired. - */ - AtomicWord _lowPriorityBypassCount{0}; - - std::queue> _normal; - std::queue> _low; -}; - /** * A PriorityTicketHolder supports queueing and prioritization of operations based on * AdmissionContext::Priority. @@ -193,9 +108,7 @@ private: } std::array(QueueType::NumQueues)> _stats; - - std::unique_ptr _pool; - ServiceContext* _serviceContext; + TicketPool _pool; }; } // namespace mongo diff --git a/src/mongo/util/concurrency/ticket_pool.cpp b/src/mongo/util/concurrency/ticket_pool.cpp index dfda5ebcf00..ca82fdb9884 100644 --- a/src/mongo/util/concurrency/ticket_pool.cpp +++ b/src/mongo/util/concurrency/ticket_pool.cpp @@ -111,10 +111,8 @@ static void atomic_notify_one(AtomicWord& atomic) noexcept { } } // namespace -TicketPool::TicketPool(int numTickets, std::unique_ptr queue) - : _available(numTickets), _queued(0), _waiters(std::move(queue)) {} - -bool TicketPool::tryAcquire() { +template +bool TicketPool::tryAcquire() { auto available = _available.load(); bool gotTicket = false; while (available > 0 && !gotTicket) { @@ -123,7 +121,8 @@ bool TicketPool::tryAcquire() { return gotTicket; } -bool TicketPool::acquire(AdmissionContext* admCtx, Date_t deadline) { +template +bool TicketPool::acquire(AdmissionContext* admCtx, Date_t deadline) { auto waiter = std::make_shared(); waiter->context = admCtx; @@ -134,9 +133,9 @@ bool TicketPool::acquire(AdmissionContext* admCtx, Date_t deadline) { if (tryAcquire()) { return true; } - _queued.addAndFetch(1); - _waiters->push(waiter); + _waiters.push(waiter); } + _queued.addAndFetch(1); auto res = atomic_wait(waiter->futexWord, TicketWaiter::State::Waiting, deadline); if (res == stdx::cv_status::timeout) { @@ -156,22 +155,23 @@ bool TicketPool::acquire(AdmissionContext* admCtx, Date_t deadline) { return true; } -std::shared_ptr TicketPool::_popWaiterOrAddTicketToPool() { +template +std::shared_ptr TicketPool::_popWaiterOrAddTicketToPool() { stdx::unique_lock lock(_mutex); - if (_waiters->empty()) { + auto waiter = _waiters.pop(); + if (!waiter) { // We need to ensure we add the ticket back to the pool while holding the mutex. This // prevents a soon-to-be waiter from missing an available ticket. Otherwise, we could // leave a waiter in the queue without ever waking it. _available.addAndFetch(1); - return nullptr; } - auto waiter = _waiters->pop(); - _queued.subtractAndFetch(1); return waiter; } -void TicketPool::_release() { +template +void TicketPool::_release() { while (auto waiter = _popWaiterOrAddTicketToPool()) { + _queued.subtractAndFetch(1); auto state = static_cast(TicketWaiter::State::Waiting); if (waiter->futexWord.compareAndSwap(&state, TicketWaiter::State::Acquired)) { atomic_notify_one(waiter->futexWord); @@ -183,18 +183,12 @@ void TicketPool::_release() { } } -void TicketPool::release() { +template +void TicketPool::release() { _release(); } -bool TicketPool::releaseIfWaiters() { - // This is prone to race conditions, but is intended as a fast-path to avoid taking the mutex - // unnecessarily. - if (_queued.load()) { - _release(); - return true; - } - return false; -} +template class TicketPool; +template class TicketPool; } // namespace mongo diff --git a/src/mongo/util/concurrency/ticket_pool.h b/src/mongo/util/concurrency/ticket_pool.h index 4cdfaf9dce7..19797da4f70 100644 --- a/src/mongo/util/concurrency/ticket_pool.h +++ b/src/mongo/util/concurrency/ticket_pool.h @@ -29,7 +29,7 @@ #pragma once -#include +#include #include "mongo/platform/atomic_word.h" #include "mongo/util/concurrency/admission_context.h" @@ -64,7 +64,6 @@ struct TicketWaiter { class TicketQueue { public: virtual ~TicketQueue(){}; - virtual bool empty() const = 0; virtual void push(std::shared_ptr) = 0; virtual std::shared_ptr pop() = 0; }; @@ -75,33 +74,122 @@ public: */ class FifoTicketQueue : public TicketQueue { public: - bool empty() const { - return _queue.empty(); - } - void push(std::shared_ptr val) { - _queue.push_back(std::move(val)); + _queue.push(std::move(val)); } std::shared_ptr pop() { + if (_queue.empty()) { + return nullptr; + } auto front = std::move(_queue.front()); - _queue.pop_front(); + _queue.pop(); return front; } private: - std::list> _queue; + std::queue> _queue; }; +/** + * This SimplePriorityTicketQueue implements a queue policy that separates normal and low priority + * operations into separate queues. Normal priority operations are always scheduled ahead of low + * priority ones, except when a positive lowPriorityBypassThreshold is provided. This parameter + * specifies how often a waiting low-priority operation should skip the queue and be scheduled ahead + * of waiting normal priority operations. + */ +class SimplePriorityTicketQueue : public TicketQueue { +public: + SimplePriorityTicketQueue(int lowPriorityBypassThreshold) + : _lowPriorityBypassThreshold(lowPriorityBypassThreshold) {} + + void push(std::shared_ptr val) final { + if (val->context->getPriority() == AdmissionContext::Priority::kLow) { + _low.push(std::move(val)); + return; + } + invariant(val->context->getPriority() == AdmissionContext::Priority::kNormal); + _normal.push(std::move(val)); + } + + std::shared_ptr pop() final { + auto normalQueued = !_normal.empty(); + auto lowQueued = !_low.empty(); + if (!normalQueued && !lowQueued) { + return nullptr; + } + if (normalQueued && lowQueued && _lowPriorityBypassThreshold.load() > 0 && + _lowPriorityBypassCount.fetchAndAdd(1) % _lowPriorityBypassThreshold.load() == 0) { + auto front = std::move(_low.front()); + _low.pop(); + _expeditedLowPriorityAdmissions.addAndFetch(1); + return front; + } + if (normalQueued) { + auto front = std::move(_normal.front()); + _normal.pop(); + return front; + } + auto front = std::move(_low.front()); + _low.pop(); + return front; + } + + /** + * Number of times low priority operations are expedited for ticket admission over normal + * priority operations. + */ + std::int64_t expedited() const { + return _expeditedLowPriorityAdmissions.loadRelaxed(); + } + + /** + * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the + * normal priority queue when a ticket becomes available. + */ + std::int64_t bypassed() const { + return _lowPriorityBypassCount.loadRelaxed(); + } + + void updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) { + _lowPriorityBypassThreshold.store(newBypassThreshold); + } + +private: + /** + * Limits the number times the low priority queue is non-empty and bypassed in favor of the + * normal priority queue for the next ticket admission. + */ + AtomicWord _lowPriorityBypassThreshold; + + /** + * Number of times ticket admission is expedited for low priority operations. + */ + AtomicWord _expeditedLowPriorityAdmissions{0}; + + /** + * Counts the number of times normal operations are dequeued over operations queued in the low + * priority queue. We explicitly use an unsigned type here because rollover is desired. + */ + AtomicWord _lowPriorityBypassCount{0}; + + std::queue> _normal; + std::queue> _low; +}; + + /** * A TicketPool holds tickets and queues waiters in the provided TicketQueue. The TicketPool * attempts to emulate a semaphore with a custom queueing policy. * * All public functions are thread-safe except where explicitly stated otherwise. */ +template class TicketPool { public: - TicketPool(int numTickets, std::unique_ptr queue); + template + TicketPool(int numTickets, Args&&... args) + : _available(numTickets), _queued(0), _waiters(std::forward(args)...) {} /** * Attempt to acquire a ticket without blocking. Returns true if a ticket was granted. @@ -118,12 +206,6 @@ public: */ void release(); - /** - * If there are queued operations, releases a ticket and returns true. Otherwise, does nothing - * and returns false. - */ - bool releaseIfWaiters(); - /** * Returns the number of tickets available. */ @@ -142,8 +224,12 @@ public: * Provides direct access to the underlying queue. Callers must ensure they only use thread-safe * functions. */ - TicketQueue* getQueue() { - return _waiters.get(); + const Queue& getQueue() const { + return _waiters; + } + + Queue& getQueue() { + return _waiters; } private: @@ -167,6 +253,6 @@ private: // This mutex protects the _waiters queue by preventing items from being added and removed, but // does not protect the elements of the queue. Mutex _mutex = MONGO_MAKE_LATCH("TicketPool::_mutex"); - std::unique_ptr _waiters; + Queue _waiters; }; } // namespace mongo diff --git a/src/mongo/util/concurrency/ticket_pool_test.cpp b/src/mongo/util/concurrency/ticket_pool_test.cpp index 051ac3a3e00..80c88e809d4 100644 --- a/src/mongo/util/concurrency/ticket_pool_test.cpp +++ b/src/mongo/util/concurrency/ticket_pool_test.cpp @@ -77,7 +77,7 @@ static inline const Milliseconds kSleepTime{1}; } TEST(TicketPoolTest, BasicTimeout) { - TicketPool pool(0, std::make_unique()); + TicketPool pool(0); { AdmissionContext ctx; @@ -91,7 +91,7 @@ TEST(TicketPoolTest, BasicTimeout) { } TEST(TicketPoolTest, HandOverWorks) { - TicketPool pool(0, std::make_unique()); + TicketPool pool(0); { ASSERT_FALSE(pool.tryAcquire()); @@ -106,7 +106,7 @@ TEST(TicketPoolTest, HandOverWorks) { return true; }); - { ASSERT_TRUE(pool.releaseIfWaiters()); } + pool.release(); waitingThread.join(); } @@ -130,7 +130,7 @@ TEST(TicketPoolTest, HandOverWorks) { }); for (int i = 1; i <= threadsToTest; i++) { - ASSERT_TRUE(pool.releaseIfWaiters()); + pool.release(); assertSoon([&] { ASSERT_SOON_EXP(pendingThreads.load() == threadsToTest - i); return true; -- cgit v1.2.1