diff options
author | Louis Williams <louis.williams@mongodb.com> | 2023-03-28 11:58:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-28 13:31:34 +0000 |
commit | 3e46a0ef81dfe95c11d9c5324fb86758b155363d (patch) | |
tree | 4765dcec524645178d080d79fca5db2ceb302a34 /src/mongo/util/concurrency | |
parent | a6ac9a1fe19da49b7067e802bf498d7fc0dcb5af (diff) | |
download | mongo-3e46a0ef81dfe95c11d9c5324fb86758b155363d.tar.gz |
SERVER-74778 Refactor and improve performance of PriorityTicketHolder
Diffstat (limited to 'src/mongo/util/concurrency')
18 files changed, 603 insertions, 677 deletions
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript index e0315f88c3c..3a9380483ee 100644 --- a/src/mongo/util/concurrency/SConscript +++ b/src/mongo/util/concurrency/SConscript @@ -22,22 +22,12 @@ env.Library( ], ) -# TODO SERVER-72616: This can go away once TicketBroker is implemented in terms of atomic -# wait/notify in C++20. -if env.TargetOSIs('linux'): - env.Library( - target='ticket_broker', - source=['ticket_broker.cpp'], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - ], - ) - env.Library( target='ticketholder', source=[ 'priority_ticketholder.cpp' if env.TargetOSIs('linux') else [], 'semaphore_ticketholder.cpp', + 'ticket_pool.cpp' if env.TargetOSIs('linux') else [], 'ticketholder.cpp', ], LIBDEPS_PRIVATE=[ @@ -47,7 +37,6 @@ env.Library( '$BUILD_DIR/mongo/db/storage/storage_engine_feature_flags', '$BUILD_DIR/third_party/shim_boost', 'admission_context', - 'ticket_broker' if env.TargetOSIs('linux') else [] ], ) @@ -72,7 +61,7 @@ env.CppUnitTest( 'spin_lock_test.cpp', 'thread_pool_test.cpp', 'ticketholder_test_fixture.cpp', - 'ticket_broker_test.cpp' if env.TargetOSIs('linux') else [], + 'ticket_pool_test.cpp' if env.TargetOSIs('linux') else [], 'with_lock_test.cpp', ], LIBDEPS=[ @@ -81,7 +70,6 @@ env.CppUnitTest( 'spin_lock', 'thread_pool', 'thread_pool_test_fixture', - 'ticket_broker' if env.TargetOSIs('linux') else [], 'ticketholder', ], ) diff --git a/src/mongo/util/concurrency/admission_context.cpp b/src/mongo/util/concurrency/admission_context.cpp index 4873133cc71..5c8f8f28508 100644 --- a/src/mongo/util/concurrency/admission_context.cpp +++ b/src/mongo/util/concurrency/admission_context.cpp @@ -28,6 +28,7 @@ */ #include "mongo/util/concurrency/admission_context.h" +#include "mongo/util/assert_util.h" namespace mongo { diff --git a/src/mongo/util/concurrency/admission_context.h b/src/mongo/util/concurrency/admission_context.h index 7ac97176555..0e52a5860e3 100644 --- a/src/mongo/util/concurrency/admission_context.h +++ b/src/mongo/util/concurrency/admission_context.h @@ -28,7 +28,7 @@ */ #pragma once -#include "mongo/db/concurrency/lock_manager_defs.h" +#include "mongo/base/string_data.h" #include "mongo/platform/atomic_word.h" #include "mongo/util/tick_source.h" @@ -84,14 +84,6 @@ public: return admissions.loadRelaxed(); } - void setLockMode(LockMode lockMode) { - _lockMode.store(lockMode); - } - - LockMode getLockMode() const { - return _lockMode.loadRelaxed(); - } - void setPriority(Priority priority) { _priority.store(priority); } @@ -108,7 +100,6 @@ private: // semantics. AtomicWord<TickSource::Tick> _startProcessingTime{0}; AtomicWord<int32_t> admissions{0}; - AtomicWord<LockMode> _lockMode{LockMode::MODE_NONE}; AtomicWord<Priority> _priority{Priority::kNormal}; }; diff --git a/src/mongo/util/concurrency/priority_ticketholder.cpp b/src/mongo/util/concurrency/priority_ticketholder.cpp index cb3e6a3bb7c..aa8e581fcf2 100644 --- a/src/mongo/util/concurrency/priority_ticketholder.cpp +++ b/src/mongo/util/concurrency/priority_ticketholder.cpp @@ -41,61 +41,50 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault namespace mongo { + PriorityTicketHolder::PriorityTicketHolder(int32_t numTickets, int32_t lowPriorityBypassThreshold, ServiceContext* serviceContext) - : TicketHolderWithQueueingStats(numTickets, serviceContext), - _lowPriorityBypassThreshold(lowPriorityBypassThreshold), - _ticketsAvailable(numTickets), - _serviceContext(serviceContext) {} + : TicketHolder(numTickets, serviceContext), _serviceContext(serviceContext) { + + auto queue = std::make_unique<SimplePriorityTicketQueue>(lowPriorityBypassThreshold); + _pool = std::make_unique<TicketPool>(numTickets, std::move(queue)); +} + +int32_t PriorityTicketHolder::available() const { + return _pool->available(); +} + +int64_t PriorityTicketHolder::queued() const { + return _pool->queued(); +} int64_t PriorityTicketHolder::numFinishedProcessing() const { return _stats[_enumToInt(QueueType::kLowPriority)].totalFinishedProcessing.load() + _stats[_enumToInt(QueueType::kNormalPriority)].totalFinishedProcessing.load(); } -void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold( - const int32_t& newBypassThreshold) { - stdx::unique_lock<stdx::mutex> growthLock(_growthMutex); - _lowPriorityBypassThreshold = newBypassThreshold; +int64_t PriorityTicketHolder::expedited() const { + return static_cast<SimplePriorityTicketQueue*>(_pool->getQueue())->expedited(); +} + +int64_t PriorityTicketHolder::bypassed() const { + return static_cast<SimplePriorityTicketQueue*>(_pool->getQueue())->bypassed(); +} + +void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) { + auto queue = static_cast<SimplePriorityTicketQueue*>(_pool->getQueue()); + queue->updateLowPriorityAdmissionBypassThreshold(newBypassThreshold); } boost::optional<Ticket> PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { invariant(admCtx); - // Handed over tickets to queued waiters do not affect this path since they are not accounted - // for in the general ticketsAvailable counter. - auto hasAcquired = _tryAcquireTicket(); - if (hasAcquired) { - return Ticket{this, admCtx}; + if (_pool->tryAcquire()) { + return Ticket(this, admCtx); } - return boost::none; -} -TicketBroker::WaitingResult PriorityTicketHolder::_attemptToAcquireTicket( - TicketBroker& ticketBroker, Date_t deadline, Milliseconds maxWaitTime) { - // We are going to enter the broker as a waiter, so we must block releasers momentarily before - // registering ourselves as a waiter. Otherwise we risk missing a ticket. - stdx::unique_lock growthLock(_growthMutex); - // Check if a ticket became present in the general pool. This prevents a potential - // deadlock if the following were to happen without a tryAcquire: - // * Thread A proceeds to wait for a ticket to be handed over but before it acquires the - // growthLock gets descheduled. - // * Thread B releases a ticket, sees no waiters and releases to the general pool. - // * Thread A acquires the lock and proceeds to wait. - // - // In this scenario Thread A would spin indefinitely since it never picks up that there is a - // ticket in the general pool. It would wait until another thread comes in and hands over a - // ticket. - if (_tryAcquireTicket()) { - TicketBroker::WaitingResult result; - result.hasTimedOut = false; - result.hasTicket = true; - return result; - } - // We wait for a tiny bit before checking for interruption. - auto maxUntil = std::min(deadline, Date_t::now() + maxWaitTime); - return ticketBroker.attemptWaitForTicketUntil(std::move(growthLock), maxUntil); + return boost::none; } boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, @@ -103,79 +92,60 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC Date_t until) { invariant(admCtx); - auto queueType = _getQueueType(admCtx); - auto& ticketBroker = _getBroker(queueType); - while (true) { - // We attempt to acquire a ticket for a period of time. This may or may not succeed, in - // which case we will retry until timing out or getting interrupted. - auto waitingResult = _attemptToAcquireTicket(ticketBroker, until, Milliseconds{500}); + // To support interruptibility of ticket acquisition, we attempt to acquire a ticket for a + // maximum period of time. This may or may not succeed, in which case we will retry until + // timing out or getting interrupted. + auto maxUntil = std::min(until, Date_t::now() + Milliseconds(500)); + auto acquired = _pool->acquire(admCtx, maxUntil); ScopeGuard rereleaseIfTimedOutOrInterrupted([&] { // We may have gotten a ticket that we can't use, release it back to the ticket pool. - if (waitingResult.hasTicket) { - _releaseToTicketPoolImpl(admCtx); + if (acquired) { + _pool->release(); } }); + if (opCtx) { opCtx->checkForInterrupt(); } - auto hasTimedOut = waitingResult.hasTimedOut; - if (hasTimedOut && Date_t::now() > until) { + + if (acquired) { + rereleaseIfTimedOutOrInterrupted.dismiss(); + return Ticket(this, admCtx); + } else if (maxUntil == until) { + // We hit the end of our deadline, so return nothing. return boost::none; } - // We haven't been interrupted or timed out, so we may have a valid ticket present. - rereleaseIfTimedOutOrInterrupted.dismiss(); - if (waitingResult.hasTicket) { - return Ticket{this, admCtx}; - } + // We hit the periodic deadline, but are still within the caller's deadline, so retry. } + + return boost::none; } void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { // 'Immediate' priority operations should bypass the ticketing system completely. invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); - - // We will now proceed to perform dequeueing, we must acquire the growth mutex in order to - // prevent new enqueuers. - stdx::unique_lock growthLock(_growthMutex); - - auto hasWokenThread = _dequeueWaitingThread(growthLock); - if (!hasWokenThread) { - // There's no-one in the queue left to wake, so we give the ticket back for general - // availability. - _ticketsAvailable.addAndFetch(1); - } + _pool->release(); } void PriorityTicketHolder::_resize(int32_t newSize, int32_t oldSize) noexcept { auto difference = newSize - oldSize; if (difference > 0) { - // As we're adding tickets the waiting threads need to be notified that there are new - // tickets available. - stdx::unique_lock dequeuerLock(_growthMutex); - for (int32_t i = 0; i < difference; i++) { - auto hasWokenThread = _dequeueWaitingThread(dequeuerLock); - if (!hasWokenThread) { - // There's no-one in the brokers left to wake, so we give the ticket back for - // general availability. - _ticketsAvailable.addAndFetch(1); - } + // Hand out tickets one-by-one until we've given them all out. + for (auto remaining = difference; remaining > 0; remaining--) { + _pool->release(); } } else { AdmissionContext admCtx; - for (int32_t i = 0; i < std::abs(difference); i++) { - // This operation is uninterruptible as the resize operation is conceptually atomic. - // Cancelling the resize and leaving it in-between the old size and the new one is not - // allowed. - auto ticket = _waitForTicketUntilImpl(nullptr, &admCtx, Date_t::max()); - invariant(ticket); - ticket->discard(); + // Take tickets one-by-one without releasing. + for (auto remaining = -difference; remaining > 0; remaining--) { + _pool->acquire(&admCtx, Date_t::max()); } } } -TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse( +TicketHolder::QueueStats& PriorityTicketHolder::_getQueueStatsToUse( const AdmissionContext* admCtx) noexcept { auto queueType = _getQueueType(admCtx); return _stats[_enumToInt(queueType)]; @@ -196,61 +166,6 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { _appendCommonQueueImplStats(bbb, normalPriorityTicketStats); bbb.done(); } - b.append("immediatePriorityAdmissionsCount", getImmediatePriorityAdmissionsCount()); -} - -bool PriorityTicketHolder::_tryAcquireTicket() { - // Test, then test and set to avoid invalidating a cache line unncessarily. - if (_ticketsAvailable.load() <= 0) { - return false; - } - auto remaining = _ticketsAvailable.subtractAndFetch(1); - if (remaining < 0) { - _ticketsAvailable.addAndFetch(1); - return false; - } - return true; -} - -bool PriorityTicketHolder::_dequeueWaitingThread(const stdx::unique_lock<stdx::mutex>& growthLock) { - // There are only 2 possible brokers to transfer our ticket to - the low priority and normal - // priority brokers. There will never be anything to transfer to the immediate priority broker, - // given immediate priority operations will never wait for ticket admission. - auto& lowPriorityBroker = _getBroker(QueueType::kLowPriority); - auto& normalPriorityBroker = _getBroker(QueueType::kNormalPriority); - - // There is a guarantee that the number of waiters will not increase while holding the growth - // lock. This check is safe as long as we only compare it against an upper bound. - auto lowPrioWaiting = lowPriorityBroker.waitingThreads(growthLock); - auto normalPrioWaiting = normalPriorityBroker.waitingThreads(growthLock); - - if (lowPrioWaiting == 0 && normalPrioWaiting == 0) { - return false; - } - if (lowPrioWaiting == 0) { - return normalPriorityBroker.attemptToTransferTicket(growthLock); - } - if (normalPrioWaiting == 0) { - return lowPriorityBroker.attemptToTransferTicket(growthLock); - } - - // Both brokers are non-empty, and the low priority broker is bypassed for release in favor of - // the normal priority broker until the bypass threshold is met. - if (_lowPriorityBypassThreshold > 0 && - _lowPriorityBypassCount.addAndFetch(1) % _lowPriorityBypassThreshold == 0) { - if (lowPriorityBroker.attemptToTransferTicket(growthLock)) { - _expeditedLowPriorityAdmissions.addAndFetch(1); - return true; - } else { - return normalPriorityBroker.attemptToTransferTicket(growthLock); - } - } - - if (!normalPriorityBroker.attemptToTransferTicket(growthLock)) { - return lowPriorityBroker.attemptToTransferTicket(growthLock); - } else { - return true; - } } } // namespace mongo diff --git a/src/mongo/util/concurrency/priority_ticketholder.h b/src/mongo/util/concurrency/priority_ticketholder.h index a46d1bca0b2..54c07e640b1 100644 --- a/src/mongo/util/concurrency/priority_ticketholder.h +++ b/src/mongo/util/concurrency/priority_ticketholder.h @@ -36,42 +36,62 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/util/concurrency/admission_context.h" #include "mongo/util/concurrency/mutex.h" -#include "mongo/util/concurrency/ticket_broker.h" +#include "mongo/util/concurrency/ticket_pool.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/hierarchical_acquisition.h" #include "mongo/util/time_support.h" namespace mongo { +namespace { +enum class QueueType : unsigned int { kLowPriority = 0, kNormalPriority = 1, NumQueues = 2 }; + +} // namespace + class Ticket; + /** - * A ticketholder implementation that centralises all ticket acquisition/releases. Waiters will get - * placed in a specific internal queue according to some logic. Releasers will wake up a waiter - * from a group chosen according to some logic. - * - * MODIFICATIONS TO THIS CLASS MUST BE ACCOMPANIED BY AN UPDATE OF - * src/mongo/tla_plus/PriorityTicketHolder/MCPriorityTicketHolder.tla TO ENSURE IT IS CORRECT + * This SimplePriorityTicketQueue implements a queue policy that separates normal and low priority + * operations into separate queues. Normal priority operations are always scheduled ahead of low + * priority ones, except when a positive lowPriorityBypassThreshold is provided. This parameter + * specifies how often a waiting low-priority operation should skip the queue and be scheduled ahead + * of waiting normal priority operations. */ -class PriorityTicketHolder : public TicketHolderWithQueueingStats { +class SimplePriorityTicketQueue : public TicketQueue { public: - explicit PriorityTicketHolder(int32_t numTickets, - int32_t lowPriorityBypassThreshold, - ServiceContext* serviceContext); - ~PriorityTicketHolder() override{}; + SimplePriorityTicketQueue(int lowPriorityBypassThreshold) + : _lowPriorityBypassThreshold(lowPriorityBypassThreshold) {} - int32_t available() const override final { - return _ticketsAvailable.load(); - }; + bool empty() const final { + return _normal.empty() && _low.empty(); + } - int64_t queued() const override final { - int64_t result = 0; - for (const auto& queue : _brokers) { - result += queue.waitingThreadsRelaxed(); + void push(std::shared_ptr<TicketWaiter> val) final { + if (val->context->getPriority() == AdmissionContext::Priority::kLow) { + _low.push(std::move(val)); + return; } - return result; + invariant(val->context->getPriority() == AdmissionContext::Priority::kNormal); + _normal.push(std::move(val)); } - int64_t numFinishedProcessing() const override final; + std::shared_ptr<TicketWaiter> pop() final { + if (!_normal.empty() && !_low.empty() && _lowPriorityBypassThreshold.load() > 0 && + _lowPriorityBypassCount.fetchAndAdd(1) % _lowPriorityBypassThreshold.load() == 0) { + auto front = std::move(_low.front()); + _low.pop(); + _expeditedLowPriorityAdmissions.addAndFetch(1); + return front; + } + if (!_normal.empty()) { + auto front = std::move(_normal.front()); + _normal.pop(); + return front; + } + auto front = std::move(_low.front()); + _low.pop(); + return front; + } /** * Number of times low priority operations are expedited for ticket admission over normal @@ -87,13 +107,61 @@ public: */ std::int64_t bypassed() const { return _lowPriorityBypassCount.loadRelaxed(); - }; + } - void updateLowPriorityAdmissionBypassThreshold(const int32_t& newBypassThreshold); + void updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) { + _lowPriorityBypassThreshold.store(newBypassThreshold); + } private: - enum class QueueType : unsigned int { kLowPriority = 0, kNormalPriority = 1, NumQueues = 2 }; + /** + * Limits the number times the low priority queue is non-empty and bypassed in favor of the + * normal priority queue for the next ticket admission. + */ + AtomicWord<std::int32_t> _lowPriorityBypassThreshold; + + /** + * Number of times ticket admission is expedited for low priority operations. + */ + AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0}; + + /** + * Counts the number of times normal operations are dequeued over operations queued in the low + * priority queue. We explicitly use an unsigned type here because rollover is desired. + */ + AtomicWord<std::uint64_t> _lowPriorityBypassCount{0}; + + std::queue<std::shared_ptr<TicketWaiter>> _normal; + std::queue<std::shared_ptr<TicketWaiter>> _low; +}; + +/** + * A PriorityTicketHolder supports queueing and prioritization of operations based on + * AdmissionContext::Priority. + * + * MODIFICATIONS TO THIS CLASS MUST BE ACCOMPANIED BY AN UPDATE OF + * src/mongo/tla_plus/PriorityTicketHolder/MCPriorityTicketHolder.tla TO ENSURE IT IS CORRECT + */ +class PriorityTicketHolder : public TicketHolder { +public: + explicit PriorityTicketHolder(int32_t numTickets, + int32_t lowPriorityBypassThreshold, + ServiceContext* serviceContext); + ~PriorityTicketHolder() override{}; + + int32_t available() const override final; + + int64_t queued() const override final; + + int64_t numFinishedProcessing() const override final; + + std::int64_t expedited() const; + std::int64_t bypassed() const; + + void updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold); + +private: boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final; boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, @@ -108,39 +176,6 @@ private: void _appendImplStats(BSONObjBuilder& b) const override final; - bool _tryAcquireTicket(); - - TicketBroker::WaitingResult _attemptToAcquireTicket(TicketBroker& ticketBroker, - Date_t deadline, - Milliseconds maxWaitTime); - - /** - * Wakes up a waiting thread (if it exists) and hands-over the current ticket. - * Implementors MUST wake at least one waiting thread if at least one thread is waiting in any - * of the brokers. In other words, attemptToTransferTicket on each non-empty TicketBroker must - * be called until either it returns true at least once or has been called on all brokers. - * - * Care must be taken to ensure that only CPU-bound work is performed here and it doesn't block. - * We risk stalling all other operations otherwise. - * - * When called the following invariants will be held: - * - Successive checks to the number of waiting threads in a TicketBroker will always be <= the - * previous value. That is, no new waiters can come in. - * - Calling TicketBroker::attemptToTransferTicket will always return false if it has previously - * returned false. Successive calls can change the result from true to false, but never the - * reverse. - */ - bool _dequeueWaitingThread(const stdx::unique_lock<stdx::mutex>& growthLock); - - static unsigned int _enumToInt(QueueType queueType) { - return static_cast<unsigned int>(queueType); - } - - TicketBroker& _getBroker(QueueType queueType) { - return _brokers[_enumToInt(queueType)]; - } - - static QueueType _getQueueType(const AdmissionContext* admCtx) { auto priority = admCtx->getPriority(); switch (priority) { @@ -153,35 +188,14 @@ private: } } - // This mutex is meant to be used in order to grow the queue or to prevent it from doing so. - // - // We use an stdx::mutex here because we want to minimize overhead as much as possible. Using a - // normal mongo::Mutex would add some unnecessary metrics counters. Additionally we need this - // type as it is part of the TicketBroker API in order to avoid misuse. - stdx::mutex _growthMutex; // NOLINT + static unsigned int _enumToInt(QueueType queueType) { + return static_cast<std::underlying_type_t<QueueType>>(queueType); + } - std::array<TicketBroker, static_cast<unsigned int>(QueueType::NumQueues)> _brokers; std::array<QueueStats, static_cast<unsigned int>(QueueType::NumQueues)> _stats; - /** - * Limits the number times the low priority queue is non-empty and bypassed in favor of the - * normal priority queue for the next ticket admission. - * - * Updates must be done under the _growthMutex. - */ - int32_t _lowPriorityBypassThreshold; - - /** - * Counts the number of times normal operations are dequeued over operations queued in the low - * priority queue. We explicitly use an unsigned type here because rollover is desired. - */ - AtomicWord<std::uint64_t> _lowPriorityBypassCount{0}; + std::unique_ptr<TicketPool> _pool; - /** - * Number of times ticket admission is expedited for low priority operations. - */ - AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0}; - AtomicWord<int32_t> _ticketsAvailable; ServiceContext* _serviceContext; }; } // namespace mongo diff --git a/src/mongo/util/concurrency/priority_ticketholder_test.cpp b/src/mongo/util/concurrency/priority_ticketholder_test.cpp index e5cfa66891c..fff70a982ec 100644 --- a/src/mongo/util/concurrency/priority_ticketholder_test.cpp +++ b/src/mongo/util/concurrency/priority_ticketholder_test.cpp @@ -33,6 +33,7 @@ #include "mongo/util/concurrency/admission_context.h" #include "mongo/util/concurrency/priority_ticketholder.h" #include "mongo/util/concurrency/ticketholder_test_fixture.h" +#include "mongo/util/periodic_runner_factory.h" #include "mongo/util/tick_source_mock.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest @@ -75,31 +76,42 @@ void assertSoon(std::function<bool()> predicate, Milliseconds timeout = kWaitTim } } -class PriorityTicketHolderTest : public TicketHolderTestFixture {}; +class PriorityTicketHolderTest : public TicketHolderTestFixture { +public: + void setUp() override { + TicketHolderTestFixture::setUp(); + + auto tickSource = std::make_unique<TickSourceMock<Microseconds>>(); + _tickSource = tickSource.get(); + getServiceContext()->setTickSource(std::move(tickSource)); + } + + TickSourceMock<Microseconds>* getTickSource() { + return _tickSource; + } + +private: + TickSourceMock<Microseconds>* _tickSource; +}; TEST_F(PriorityTicketHolderTest, BasicTimeoutPriority) { - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); basicTimeout(_opCtx.get(), - std::make_unique<PriorityTicketHolder>( - 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext)); + std::make_unique<PriorityTicketHolder>(1 /* tickets */, + kDefaultLowPriorityAdmissionBypassThreshold, + getServiceContext())); } TEST_F(PriorityTicketHolderTest, ResizeStatsPriority) { - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); - resizeTest(_opCtx.get(), - std::make_unique<PriorityTicketHolder>( - 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext), - tickSource); + std::make_unique<PriorityTicketHolder>(1 /* tickets */, + kDefaultLowPriorityAdmissionBypassThreshold, + getServiceContext()), + getTickSource()); } TEST_F(PriorityTicketHolderTest, PriorityTwoQueuedOperations) { - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); + PriorityTicketHolder holder( + 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext()); Stats stats(&holder); @@ -170,9 +182,8 @@ TEST_F(PriorityTicketHolderTest, PriorityTwoQueuedOperations) { TEST_F(PriorityTicketHolderTest, OnlyLowPriorityOps) { - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); + PriorityTicketHolder holder( + 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext()); Stats stats(&holder); // This mutex is to avoid data race conditions between checking for the ticket state and setting @@ -289,9 +300,8 @@ TEST_F(PriorityTicketHolderTest, OnlyLowPriorityOps) { } TEST_F(PriorityTicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); + PriorityTicketHolder holder( + 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext()); Stats stats(&holder); { @@ -377,10 +387,8 @@ TEST_F(PriorityTicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { } TEST_F(PriorityTicketHolderTest, PriorityBasicMetrics) { - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); - PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); + PriorityTicketHolder holder( + 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext()); Stats stats(&holder); MockAdmission lowPriorityAdmission(this->getServiceContext(), AdmissionContext::Priority::kLow); @@ -416,7 +424,7 @@ TEST_F(PriorityTicketHolderTest, PriorityBasicMetrics) { }); } - tickSource->advance(Microseconds(100)); + getTickSource()->advance(Microseconds(100)); lowPriorityAdmission.ticket.reset(); while (holder.queued() > 0) { @@ -424,7 +432,7 @@ TEST_F(PriorityTicketHolderTest, PriorityBasicMetrics) { } barrierAcquiredTicket.countDownAndWait(); - tickSource->advance(Microseconds(200)); + getTickSource()->advance(Microseconds(200)); barrierReleaseTicket.countDownAndWait(); waiting.join(); @@ -473,10 +481,8 @@ TEST_F(PriorityTicketHolderTest, PriorityBasicMetrics) { } TEST_F(PriorityTicketHolderTest, PriorityCanceled) { - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); - PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); + PriorityTicketHolder holder( + 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext()); Stats stats(&holder); { MockAdmission lowPriorityAdmission(this->getServiceContext(), @@ -497,7 +503,7 @@ TEST_F(PriorityTicketHolderTest, PriorityCanceled) { // Wait for thread to take ticket. } - tickSource->advance(Microseconds(100)); + getTickSource()->advance(Microseconds(100)); waiting.join(); } @@ -532,10 +538,8 @@ TEST_F(PriorityTicketHolderTest, PriorityCanceled) { } TEST_F(PriorityTicketHolderTest, LowPriorityExpedited) { - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); auto lowPriorityBypassThreshold = 2; - PriorityTicketHolder holder(1, lowPriorityBypassThreshold, &serviceContext); + PriorityTicketHolder holder(1 /* tickets */, lowPriorityBypassThreshold, getServiceContext()); Stats stats(&holder); // Use the GlobalServiceContext to create MockAdmissions. @@ -617,4 +621,10 @@ TEST_F(PriorityTicketHolderTest, LowPriorityExpedited) { ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), queuedNormalAdmissionsCount + 1); ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0); } + +TEST_F(PriorityTicketHolderTest, Interruption) { + interruptTest(_opCtx.get(), + std::make_unique<PriorityTicketHolder>(1 /* tickets */, 0, getServiceContext())); +} + } // namespace diff --git a/src/mongo/util/concurrency/semaphore_ticketholder.cpp b/src/mongo/util/concurrency/semaphore_ticketholder.cpp index 1099b9d7cb6..31969a6ad11 100644 --- a/src/mongo/util/concurrency/semaphore_ticketholder.cpp +++ b/src/mongo/util/concurrency/semaphore_ticketholder.cpp @@ -82,7 +82,7 @@ void tsFromDate(const Date_t& deadline, struct timespec& ts) { } // namespace SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext) - : TicketHolderWithQueueingStats(numTickets, serviceContext) { + : TicketHolder(numTickets, serviceContext) { check(sem_init(&_sem, 0, numTickets)); } @@ -160,7 +160,7 @@ void SemaphoreTicketHolder::_resize(int32_t newSize, int32_t oldSize) noexcept { #else SemaphoreTicketHolder::SemaphoreTicketHolder(int32_t numTickets, ServiceContext* svcCtx) - : TicketHolderWithQueueingStats(numTickets, svcCtx), _numTickets(numTickets) {} + : TicketHolder(numTickets, svcCtx), _numTickets(numTickets) {} SemaphoreTicketHolder::~SemaphoreTicketHolder() = default; diff --git a/src/mongo/util/concurrency/semaphore_ticketholder.h b/src/mongo/util/concurrency/semaphore_ticketholder.h index d2e6ef1f257..45e5d80d7cc 100644 --- a/src/mongo/util/concurrency/semaphore_ticketholder.h +++ b/src/mongo/util/concurrency/semaphore_ticketholder.h @@ -46,7 +46,7 @@ namespace mongo { -class SemaphoreTicketHolder final : public TicketHolderWithQueueingStats { +class SemaphoreTicketHolder final : public TicketHolder { public: explicit SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext); ~SemaphoreTicketHolder() override final; diff --git a/src/mongo/util/concurrency/semaphore_ticketholder_test.cpp b/src/mongo/util/concurrency/semaphore_ticketholder_test.cpp index 241e6f622e8..d8cc00f4ef5 100644 --- a/src/mongo/util/concurrency/semaphore_ticketholder_test.cpp +++ b/src/mongo/util/concurrency/semaphore_ticketholder_test.cpp @@ -50,4 +50,12 @@ TEST_F(SemaphoreTicketHolderTest, ResizeStatsSemaphore) { resizeTest( _opCtx.get(), std::make_unique<SemaphoreTicketHolder>(1, &serviceContext), tickSource); } + +TEST_F(SemaphoreTicketHolderTest, Interruption) { + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + interruptTest(_opCtx.get(), + std::make_unique<SemaphoreTicketHolder>(1 /* tickets */, getServiceContext())); +} + } // namespace diff --git a/src/mongo/util/concurrency/ticket_broker.h b/src/mongo/util/concurrency/ticket_broker.h deleted file mode 100644 index fbff94d06e2..00000000000 --- a/src/mongo/util/concurrency/ticket_broker.h +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Copyright (C) 2022-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ -#pragma once - -#include "mongo/platform/atomic_word.h" -#include "mongo/platform/mutex.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -/** - * A ticket broker between threads waiting (Ticket-waiters) and threads willing to give a ticket - * (Ticket-releasers). - * - * This broker requires external synchronisation (growthLock) so that it can function correctly. The - * methods that require synchronisation have their signature written so that it enforces correct - * usage. Using it with a different mutex in two separate calls is undefined behaviour and will - * almost certainly cause a deadlock or segmentation fault. - * - * This class is to be used when more than one broker is necessary in a given scope and certain - * guarantees have to be made. Given the following conditions for usage: - * - * - Ticket-waiters must acquire the growthLock in order to enter the broker. - * - Ticket-releasers must acquire the growthLock in order to transfer their ticket to a waiter in - * the broker. - * - * Then this class can provide the following guarantees across the multiple instances of it in - * scope: - * - * - Ticket-releasers can attempt to transfer their ticket to each broker only once. No other waiter - * will appear between attempts. - * - Ticket-waiters will get scheduled for execution once transferred a ticket without having to - * acquire any mutex. - * - * This is useful for example if you need to build a scheduler based on top of multiple brokers - * representing different groups of operations. The ticket-releasers can have "snapshot" guarantees - * of the state of the system and choose who to transfer the ticket to based on some arbitrary - * logic. - * - * Note that the implementation allows granular thread selection for wakeup but we've chosen to use - * FIFO semantics to make the critical section as short lived as possible. - */ -class TicketBroker { -public: - TicketBroker(); - - struct WaitingResult { - bool hasTimedOut; - bool hasTicket; - }; - - /** - * Attempts to wait for a ticket until it reaches the specified timeout. The return type will - * contain whether the attempt was successful and/or if it timed out. - * - * This method consumes the lock since it will internally unlock it. Only locking again if the - * call times out in order to remove the thread from the waiting list. - */ - WaitingResult attemptWaitForTicketUntil(stdx::unique_lock<stdx::mutex> growthLock, - Date_t until) noexcept; - - /** - * Transfers the ticket if there is a thread to transfer it to. Returns true if the ticket was - * transferred. - * - * Guarantee: No modifications to the internal linked list will take place while holding the - * lock. - */ - bool attemptToTransferTicket(const stdx::unique_lock<stdx::mutex>& growthLock) noexcept; - - /** - * Returns the number of threads waiting. - * - * - * This value will be consistent while holding the growthLock. - */ - int32_t waitingThreads(const stdx::unique_lock<stdx::mutex>& growthLock) const noexcept { - return _numQueued.load(); - } - - /** - * Same as 'waitingThreads()' but without strict ordering and consistency guarantees. - * - * This method is meant for monitoring and tests only. The value is a snapshot of the system at - * the moment of calling the method. It will potentially be out of date as soon as it returns. - */ - int32_t waitingThreadsRelaxed() const noexcept { - return _numQueued.loadRelaxed(); - } - -private: - /** - * Node structure of the linked list, it has to be a doubly linked list in order to allow - * random removal of nodes. Lifetime of these nodes will reside in the stack memory of the - * thread waiting. - */ - struct Node { - Node* previous{nullptr}; - AtomicWord<uint32_t> futexWord{0}; - Node* next{nullptr}; - }; - - // Append the node to the linked list. - void _registerAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, Node& node) noexcept; - - // Removes the node from the linked list. - void _unregisterAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, Node& node) noexcept; - - /** - * Edge nodes of the linked list. To append we need to know the end of the list in order to make - * appends O(1) instead of O(n). - */ - Node* _queueBegin; - Node* _queueEnd; - - /** - * Number of queued threads in the linked list. - */ - AtomicWord<int32_t> _numQueued; -}; - -} // namespace mongo diff --git a/src/mongo/util/concurrency/ticket_broker.cpp b/src/mongo/util/concurrency/ticket_pool.cpp index 1e9c4ba403c..dfda5ebcf00 100644 --- a/src/mongo/util/concurrency/ticket_broker.cpp +++ b/src/mongo/util/concurrency/ticket_pool.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2022-present MongoDB, Inc. + * Copyright (C) 2023-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -27,18 +27,21 @@ * it in the license file. */ -#include "mongo/util/concurrency/ticket_broker.h" -#include "mongo/logv2/log.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/util/errno_util.h" - #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault +#include "ticket_pool.h" + // TODO SERVER-72616: Remove futex usage from this class in favour of atomic waits. #include <linux/futex.h> /* Definition of FUTEX_* constants */ #include <sys/syscall.h> /* Definition of SYS_* constants */ #include <unistd.h> +#include "mongo/logv2/log.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/errno_util.h" + namespace mongo { namespace { static int futex(uint32_t* uaddr, @@ -108,115 +111,87 @@ static void atomic_notify_one(AtomicWord<uint32_t>& atomic) noexcept { } } // namespace -TicketBroker::TicketBroker() : _queueBegin(nullptr), _queueEnd(nullptr), _numQueued(0) {} - -void TicketBroker::_registerAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, - Node& node) noexcept { - // We register the node. - _numQueued.fetchAndAdd(1); - - if (_queueBegin == nullptr) { - // If the list is empty we are the first node. - _queueBegin = &node; - _queueEnd = &node; - } else { - // Otherwise we're the new end and must link the preceding node to us, and us to the - // preceding node. - _queueEnd->next = &node; - node.previous = _queueEnd; - _queueEnd = &node; +TicketPool::TicketPool(int numTickets, std::unique_ptr<TicketQueue> queue) + : _available(numTickets), _queued(0), _waiters(std::move(queue)) {} + +bool TicketPool::tryAcquire() { + auto available = _available.load(); + bool gotTicket = false; + while (available > 0 && !gotTicket) { + gotTicket = _available.compareAndSwap(&available, available - 1); } + return gotTicket; } -void TicketBroker::_unregisterAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, - Node& node) noexcept { - // We've been unregistered by a ticket transfer, nothing to do as the transferer already removed - // us. - if (node.futexWord.load() != 0) { - return; - } +bool TicketPool::acquire(AdmissionContext* admCtx, Date_t deadline) { + auto waiter = std::make_shared<TicketWaiter>(); + waiter->context = admCtx; - auto previousLength = _numQueued.fetchAndSubtract(1); - // If there was only 1 node it was us, the queue will now be empty. - if (previousLength == 1) { - _queueBegin = _queueEnd = nullptr; - return; - } - // If the beginning of the linked list is this node we advance it to the next element. - if (_queueBegin == &node) { - _queueBegin = node.next; - node.next->previous = nullptr; - return; + { + stdx::unique_lock<Mutex> lk(_mutex); + // It is important to check for a ticket one more time before queueing, as a ticket may have + // just become available. + if (tryAcquire()) { + return true; + } + _queued.addAndFetch(1); + _waiters->push(waiter); } - // If the end of the queue is this node, then the new end is the preceding node. - if (_queueEnd == &node) { - _queueEnd = node.previous; - node.previous->next = nullptr; - return; + auto res = atomic_wait(waiter->futexWord, TicketWaiter::State::Waiting, deadline); + if (res == stdx::cv_status::timeout) { + // If we timed out, we need to invalidate ourselves, but ensure that we take a ticket if + // it was given. + auto state = static_cast<uint32_t>(TicketWaiter::State::Waiting); + if (waiter->futexWord.compareAndSwap(&state, TicketWaiter::State::TimedOut)) { + // Successfully set outselves to timed out so nobody tries to give us a ticket. + return false; + } else { + // We were given a ticket anyways. We must take it. + invariant(state == TicketWaiter::State::Acquired); + return true; + } } - - // Otherwise we're in the middle of the list. Preceding and successive nodes must be updated - // accordingly. - node.previous->next = node.next; - node.next->previous = node.previous; + invariant(waiter->futexWord.load() == TicketWaiter::State::Acquired); + return true; } -TicketBroker::WaitingResult TicketBroker::attemptWaitForTicketUntil( - stdx::unique_lock<stdx::mutex> growthLock, Date_t until) noexcept { - // Stack allocate the node of the linked list, this approach lets us ignore heap memory in - // favour of stack memory which is dramatically cheaper. Care must be taken to ensure that there - // are no references left in the queue to this node once returning from the method. - // - // If std::promise didn't perform a heap allocation we could use it here. - Node node; - - // We add ourselves as a waiter, we are still holding the lock here. - _registerAsWaiter(growthLock, node); - - // Finished modifying the linked list, the lock can be released now. - growthLock.unlock(); - - // We now wait until obtaining the notification via the futex word. - auto waitResult = atomic_wait(node.futexWord, 0, until); - bool hasTimedOut = waitResult == stdx::cv_status::timeout; - - if (hasTimedOut) { - // Timing out implies that the node must be removed from the list, block list modifications - // to prevent segmentation faults. - growthLock.lock(); - _unregisterAsWaiter(growthLock, node); - growthLock.unlock(); +std::shared_ptr<TicketWaiter> TicketPool::_popWaiterOrAddTicketToPool() { + stdx::unique_lock<Mutex> lock(_mutex); + if (_waiters->empty()) { + // We need to ensure we add the ticket back to the pool while holding the mutex. This + // prevents a soon-to-be waiter from missing an available ticket. Otherwise, we could + // leave a waiter in the queue without ever waking it. + _available.addAndFetch(1); + return nullptr; } + auto waiter = _waiters->pop(); + _queued.subtractAndFetch(1); + return waiter; +} - // If we haven't timed out it means that the ticket has been transferred to our node. The - // transfer method removes the node from the linked list, so there's no cleanup to be done. - - auto hasTicket = node.futexWord.load() != 0; +void TicketPool::_release() { + while (auto waiter = _popWaiterOrAddTicketToPool()) { + auto state = static_cast<uint32_t>(TicketWaiter::State::Waiting); + if (waiter->futexWord.compareAndSwap(&state, TicketWaiter::State::Acquired)) { + atomic_notify_one(waiter->futexWord); + return; + } else { + // We raced with the waiter timing out, so we didn't transfer the ticket. Try again. + invariant(state == TicketWaiter::State::TimedOut); + } + } +} - return TicketBroker::WaitingResult{hasTimedOut, hasTicket}; +void TicketPool::release() { + _release(); } -bool TicketBroker::attemptToTransferTicket( - const stdx::unique_lock<stdx::mutex>& growthLock) noexcept { - // We can only transfer a ticket if there is a thread waiting for it. - if (_numQueued.load() > 0) { - _numQueued.fetchAndSubtract(1); - - // We notify the first element in the queue. To avoid race conditions we first remove the - // node and then notify the waiting thread. Doing the opposite risks a segmentation fault if - // the node gets deallocated before we remove it from the list. - auto node = _queueBegin; - _queueBegin = node->next; - if (_queueBegin) { - // Next node isn't empty, we must inform it that it's first in line. - _queueBegin->previous = nullptr; - } - auto& futexAtomic = node->futexWord; - futexAtomic.store(1); - // We've transferred a ticket and removed the node from the list, inform the waiting thread - // that it can proceed. - atomic_notify_one(futexAtomic); +bool TicketPool::releaseIfWaiters() { + // This is prone to race conditions, but is intended as a fast-path to avoid taking the mutex + // unnecessarily. + if (_queued.load()) { + _release(); return true; } return false; diff --git a/src/mongo/util/concurrency/ticket_pool.h b/src/mongo/util/concurrency/ticket_pool.h new file mode 100644 index 00000000000..4cdfaf9dce7 --- /dev/null +++ b/src/mongo/util/concurrency/ticket_pool.h @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <list> + +#include "mongo/platform/atomic_word.h" +#include "mongo/util/concurrency/admission_context.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +/** + * A ticket waiter represents an operation that queues when no tickets are available. + */ +struct TicketWaiter { + enum State : uint32_t { + // This is the initial state. May transition to only Acquired or TimedOut. + Waiting = 0, + // A releaser will set the waiter to the Acquired state when a ticket is available. This is + // a terminal state. + Acquired, + // The waiter will transition to this state when it times out. Releasers will not give + // tickets to waiters in the TimedOut state. This is a terminal state. + TimedOut, + }; + AtomicWord<uint32_t> futexWord{Waiting}; + + // Only valid to dereference when in the Waiting state and while holding the queue lock. + AdmissionContext* context{nullptr}; +}; + +/** + * A TicketQueue is an interface that represents a queue of waiters whose ordering is + * implementation-defined. + */ +class TicketQueue { +public: + virtual ~TicketQueue(){}; + virtual bool empty() const = 0; + virtual void push(std::shared_ptr<TicketWaiter>) = 0; + virtual std::shared_ptr<TicketWaiter> pop() = 0; +}; + +/** + * The FifoTicketQueue is a simple FIFO queue where new waiters are placed at the end and the oldest + * waiters are removed first. + */ +class FifoTicketQueue : public TicketQueue { +public: + bool empty() const { + return _queue.empty(); + } + + void push(std::shared_ptr<TicketWaiter> val) { + _queue.push_back(std::move(val)); + } + + std::shared_ptr<TicketWaiter> pop() { + auto front = std::move(_queue.front()); + _queue.pop_front(); + return front; + } + +private: + std::list<std::shared_ptr<TicketWaiter>> _queue; +}; + +/** + * A TicketPool holds tickets and queues waiters in the provided TicketQueue. The TicketPool + * attempts to emulate a semaphore with a custom queueing policy. + * + * All public functions are thread-safe except where explicitly stated otherwise. + */ +class TicketPool { +public: + TicketPool(int numTickets, std::unique_ptr<TicketQueue> queue); + + /** + * Attempt to acquire a ticket without blocking. Returns true if a ticket was granted. + */ + bool tryAcquire(); + + /** + * Acquire a ticket until the provided deadline. Returns false on timeout, true otherwise. + */ + bool acquire(AdmissionContext* admCtx, Date_t deadline); + + /** + * Releases a ticket to the pool. Will will wake a waiter, if there are any queued operations. + */ + void release(); + + /** + * If there are queued operations, releases a ticket and returns true. Otherwise, does nothing + * and returns false. + */ + bool releaseIfWaiters(); + + /** + * Returns the number of tickets available. + */ + int32_t available() const { + return _available.load(); + } + + /** + * Returns the number of queued waiters. + */ + int32_t queued() const { + return _queued.load(); + } + + /* + * Provides direct access to the underlying queue. Callers must ensure they only use thread-safe + * functions. + */ + TicketQueue* getQueue() { + return _waiters.get(); + } + +private: + /** + * Attempt to give the ticket to a waiter, and otherwise the pool. + */ + void _release(); + + /** + * Removes the next waiter from the queue. If there are no waiters, adds the ticket to the pool. + * Ensures that no new waiters queue while this is happening. + */ + std::shared_ptr<TicketWaiter> _popWaiterOrAddTicketToPool(); + + AtomicWord<int32_t> _available; + + // This counter is redundant with the _waiters queue length, but provides the releaseIfWaiters() + // a fast-path that avoids taking the queue mutex. + AtomicWord<int32_t> _queued; + + // This mutex protects the _waiters queue by preventing items from being added and removed, but + // does not protect the elements of the queue. + Mutex _mutex = MONGO_MAKE_LATCH("TicketPool::_mutex"); + std::unique_ptr<TicketQueue> _waiters; +}; +} // namespace mongo diff --git a/src/mongo/util/concurrency/ticket_broker_test.cpp b/src/mongo/util/concurrency/ticket_pool_test.cpp index 20008a721bb..051ac3a3e00 100644 --- a/src/mongo/util/concurrency/ticket_broker_test.cpp +++ b/src/mongo/util/concurrency/ticket_pool_test.cpp @@ -35,7 +35,7 @@ #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" -#include "mongo/util/concurrency/ticket_broker.h" +#include "mongo/util/concurrency/ticket_pool.h" #include "mongo/util/duration.h" #include "mongo/util/timer.h" @@ -76,55 +76,37 @@ static inline const Milliseconds kSleepTime{1}; } \ } -TEST(TicketBrokerTest, BasicTimeout) { - TicketBroker broker; - - stdx::mutex brokerMutex; // NOLINT +TEST(TicketPoolTest, BasicTimeout) { + TicketPool pool(0, std::make_unique<FifoTicketQueue>()); { - stdx::unique_lock lk(brokerMutex); - auto result = - broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Milliseconds{10}); - ASSERT_FALSE(result.hasTicket); - ASSERT_TRUE(result.hasTimedOut); + AdmissionContext ctx; + ASSERT_FALSE(pool.acquire(&ctx, Date_t::now() + Milliseconds{10})); } { - stdx::unique_lock lk(brokerMutex); - auto result = broker.attemptWaitForTicketUntil(std::move(lk), Date_t::min()); - ASSERT_FALSE(result.hasTicket); - ASSERT_TRUE(result.hasTimedOut); + AdmissionContext admCtx; + ASSERT_FALSE(pool.acquire(&admCtx, Date_t::min())); } } -TEST(TicketBrokerTest, HandOverWorks) { - TicketBroker broker; - - stdx::mutex brokerMutex; // NOLINT +TEST(TicketPoolTest, HandOverWorks) { + TicketPool pool(0, std::make_unique<FifoTicketQueue>()); { - { - stdx::unique_lock growthLock(brokerMutex); - ASSERT_FALSE(broker.attemptToTransferTicket(growthLock)); - } + ASSERT_FALSE(pool.tryAcquire()); stdx::thread waitingThread([&] { - stdx::unique_lock lk(brokerMutex); - auto result = - broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Seconds{10}); - ASSERT_TRUE(result.hasTicket); - ASSERT_FALSE(result.hasTimedOut); + AdmissionContext ctx; + ASSERT_TRUE(pool.acquire(&ctx, Date_t::now() + Seconds{10})); }); assertSoon([&] { - ASSERT_SOON_EXP(broker.waitingThreadsRelaxed() == 1); + ASSERT_SOON_EXP(pool.queued() == 1); return true; }); - { - stdx::unique_lock growthLock(brokerMutex); - ASSERT_TRUE(broker.attemptToTransferTicket(growthLock)); - } + { ASSERT_TRUE(pool.releaseIfWaiters()); } waitingThread.join(); } @@ -136,23 +118,19 @@ TEST(TicketBrokerTest, HandOverWorks) { for (int i = 0; i < threadsToTest; i++) { threads.emplace_back([&] { - stdx::unique_lock lk(brokerMutex); - auto result = - broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Seconds{10}); - ASSERT_TRUE(result.hasTicket); - ASSERT_FALSE(result.hasTimedOut); + AdmissionContext ctx; + ASSERT_TRUE(pool.acquire(&ctx, Date_t::now() + Seconds{10})); pendingThreads.subtractAndFetch(1); }); } assertSoon([&] { - ASSERT_SOON_EXP(broker.waitingThreadsRelaxed() == 10); + ASSERT_SOON_EXP(pool.queued() == 10); return true; }); for (int i = 1; i <= threadsToTest; i++) { - stdx::unique_lock growthLock(brokerMutex); - ASSERT_TRUE(broker.attemptToTransferTicket(growthLock)); + ASSERT_TRUE(pool.releaseIfWaiters()); assertSoon([&] { ASSERT_SOON_EXP(pendingThreads.load() == threadsToTest - i); return true; diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index d755b665626..03ef2ec90a4 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -43,7 +43,7 @@ namespace mongo { namespace { void updateQueueStatsOnRelease(ServiceContext* serviceContext, - TicketHolderWithQueueingStats::QueueStats& queueStats, + TicketHolder::QueueStats& queueStats, AdmissionContext* admCtx) { queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1); auto startTime = admCtx->getStartProcessingTime(); @@ -53,7 +53,7 @@ void updateQueueStatsOnRelease(ServiceContext* serviceContext, } void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext, - TicketHolderWithQueueingStats::QueueStats& queueStats, + TicketHolder::QueueStats& queueStats, AdmissionContext* admCtx) { if (admCtx->getAdmissions() == 0) { queueStats.totalNewAdmissions.fetchAndAddRelaxed(1); @@ -63,34 +63,34 @@ void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext, } } // namespace -void TicketHolderWithQueueingStats::resize(int32_t newSize) noexcept { +void TicketHolder::resize(int32_t newSize) noexcept { stdx::lock_guard<Latch> lk(_resizeMutex); _resize(newSize, _outof.load()); _outof.store(newSize); } -void TicketHolderWithQueueingStats::appendStats(BSONObjBuilder& b) const { +void TicketHolder::appendStats(BSONObjBuilder& b) const { b.append("out", used()); b.append("available", available()); b.append("totalTickets", outof()); + b.append("immediatePriorityAdmissionsCount", getImmediatePriorityAdmissionsCount()); _appendImplStats(b); } -void TicketHolderWithQueueingStats::_releaseToTicketPool(AdmissionContext* admCtx) noexcept { +void TicketHolder::_releaseToTicketPool(AdmissionContext* admCtx) noexcept { auto& queueStats = _getQueueStatsToUse(admCtx); updateQueueStatsOnRelease(_serviceContext, queueStats, admCtx); _releaseToTicketPoolImpl(admCtx); } -Ticket TicketHolderWithQueueingStats::waitForTicket(OperationContext* opCtx, - AdmissionContext* admCtx) { +Ticket TicketHolder::waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx) { auto res = waitForTicketUntil(opCtx, admCtx, Date_t::max()); invariant(res); return std::move(*res); } -boost::optional<Ticket> TicketHolderWithQueueingStats::tryAcquire(AdmissionContext* admCtx) { +boost::optional<Ticket> TicketHolder::tryAcquire(AdmissionContext* admCtx) { // 'kImmediate' operations should always bypass the ticketing system. invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); auto ticket = _tryAcquireImpl(admCtx); @@ -104,9 +104,9 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::tryAcquire(AdmissionConte } -boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until) { +boost::optional<Ticket> TicketHolder::waitForTicketUntil(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until) { invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); // Attempt a quick acquisition first. @@ -145,11 +145,11 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat } } -int32_t TicketHolderWithQueueingStats::getAndResetPeakUsed() { +int32_t TicketHolder::getAndResetPeakUsed() { return _peakUsed.swap(used()); } -void TicketHolderWithQueueingStats::_updatePeakUsed() { +void TicketHolder::_updatePeakUsed() { if (!feature_flags::gFeatureFlagExecutionControl.isEnabledAndIgnoreFCV()) { return; } @@ -161,8 +161,7 @@ void TicketHolderWithQueueingStats::_updatePeakUsed() { } } -void TicketHolderWithQueueingStats::_appendCommonQueueImplStats(BSONObjBuilder& b, - const QueueStats& stats) const { +void TicketHolder::_appendCommonQueueImplStats(BSONObjBuilder& b, const QueueStats& stats) const { auto removed = stats.totalRemovedQueue.loadRelaxed(); auto added = stats.totalAddedQueue.loadRelaxed(); @@ -195,6 +194,16 @@ boost::optional<Ticket> MockTicketHolder::waitForTicketUntil(OperationContext*, return {}; } +boost::optional<Ticket> MockTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { + return boost::none; +} + +boost::optional<Ticket> MockTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until) { + return boost::none; +} + void MockTicketHolder::setUsed(int32_t used) { _used = used; if (_used > _peakUsed) { diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index e2325120385..6eb66bbc0b5 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -53,25 +53,27 @@ class TicketHolder { friend class Ticket; public: + TicketHolder(int32_t numTickets, ServiceContext* svcCtx) + : _outof(numTickets), _serviceContext(svcCtx){}; virtual ~TicketHolder(){}; /** * Adjusts the total number of tickets allocated for the ticket pool to 'newSize'. */ - virtual void resize(int32_t newSize) noexcept {}; + virtual void resize(int32_t newSize) noexcept; /** * Attempts to acquire a ticket without blocking. - * Returns a boolean indicating whether the operation was successful or not. + * Returns a ticket if one is available, and boost::none otherwise. */ - virtual boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) = 0; + virtual boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx); /** * Attempts to acquire a ticket. Blocks until a ticket is acquired or the OperationContext * 'opCtx' is killed, throwing an AssertionException. If no OperationContext is provided, then * the operation is uninterruptible. */ - virtual Ticket waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx) = 0; + virtual Ticket waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx); /** * Attempts to acquire a ticket within a deadline, 'until'. Returns 'true' if a ticket is @@ -81,96 +83,66 @@ public: */ virtual boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx, AdmissionContext* admCtx, - Date_t until) = 0; - - virtual void appendStats(BSONObjBuilder& b) const = 0; - - /** - * 'Immediate' admissions don't need to acquire or wait for a ticket. However, they should - * report to the TicketHolder for tracking purposes. - * - * Increments the count of 'immediate' priority admissions reported. - */ - virtual void reportImmediatePriorityAdmission() = 0; + Date_t until); /** - * Instantaneous number of tickets 'available' (not checked out by an operation) in the ticket - * pool. + * The total number of tickets allotted to the ticket pool. */ - virtual int32_t available() const = 0; + virtual int32_t outof() const { + return _outof.loadRelaxed(); + } /** * Instantaneous number of tickets that are checked out by an operation. */ - virtual int32_t used() const = 0; + virtual int32_t used() const { + return outof() - available(); + } /** * Peak number of tickets checked out at once since the previous time this function was called. */ - virtual int32_t getAndResetPeakUsed() = 0; + virtual int32_t getAndResetPeakUsed(); /** - * The total number of tickets allotted to the ticket pool. + * 'Immediate' admissions don't need to acquire or wait for a ticket. However, they should + * report to the TicketHolder for tracking purposes. + * + * Increments the count of 'immediate' priority admissions reported. */ - virtual int32_t outof() const = 0; + virtual void reportImmediatePriorityAdmission() { + _immediatePriorityAdmissionsCount.fetchAndAdd(1); + } /** - * The total number of operations that acquired a ticket, completed their work, and released the - * ticket. + * Returns the number of 'immediate' priority admissions, which always bypass ticket + * acquisition. */ - virtual int64_t numFinishedProcessing() const = 0; + int64_t getImmediatePriorityAdmissionsCount() const { + return _immediatePriorityAdmissionsCount.loadRelaxed(); + } + + virtual void appendStats(BSONObjBuilder& b) const; -private: /** - * Releases a ticket back into the ticketing pool. + * Instantaneous number of tickets 'available' (not checked out by an operation) in the ticket + * pool. */ - virtual void _releaseToTicketPool(AdmissionContext* admCtx) noexcept = 0; -}; - -/** - * A ticketholder which manages both aggregate and policy specific queueing statistics. - */ -class TicketHolderWithQueueingStats : public TicketHolder { -public: - TicketHolderWithQueueingStats(int32_t numTickets, ServiceContext* svcCtx) - : _outof(numTickets), _serviceContext(svcCtx){}; - - ~TicketHolderWithQueueingStats() override{}; - - boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) override; - - Ticket waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx) override; - - boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until) override; + virtual int32_t available() const = 0; /** - * Adjusts the total number of tickets allocated for the ticket pool to 'newSize'. + * Instantaneous number of operations waiting in queue for a ticket. + * + * TODO SERVER-74082: Once the SemaphoreTicketHolder is removed, consider changing this metric + * to int32_t. */ - void resize(int32_t newSize) noexcept override; - - int32_t used() const override { - return outof() - available(); - } - - int32_t outof() const override { - return _outof.loadRelaxed(); - } + virtual int64_t queued() const = 0; /** - * Returns the number of 'immediate' priority admissions, which always bypass ticket - * acquisition. + * The total number of operations that acquired a ticket, completed their work, and released the + * ticket. */ - int64_t getImmediatePriorityAdmissionsCount() const { - return _immediatePriorityAdmissionsCount.loadRelaxed(); - } - - void reportImmediatePriorityAdmission() override final { - _immediatePriorityAdmissionsCount.fetchAndAdd(1); - } - - void appendStats(BSONObjBuilder& b) const override; + virtual int64_t numFinishedProcessing() const = 0; /** * Statistics for queueing mechanisms in the TicketHolder implementations. The term "Queue" is a @@ -188,18 +160,13 @@ public: AtomicWord<std::int64_t> totalTimeQueuedMicros{0}; }; - int32_t getAndResetPeakUsed() override; - +private: /** - * Instantaneous number of operations waiting in queue for a ticket. - * - * TODO SERVER-74082: Once the SemaphoreTicketHolder is removed, consider changing this metric - * to int32_t. + * Releases a ticket back into the ticketing pool. */ - virtual int64_t queued() const = 0; + virtual void _releaseToTicketPool(AdmissionContext* admCtx) noexcept; -private: - void _releaseToTicketPool(AdmissionContext* admCtx) noexcept override final; + virtual void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept = 0; virtual boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) = 0; @@ -207,11 +174,9 @@ private: AdmissionContext* admCtx, Date_t until) = 0; - virtual void _appendImplStats(BSONObjBuilder& b) const = 0; + virtual void _appendImplStats(BSONObjBuilder& b) const {} - virtual void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept = 0; - - virtual void _resize(int32_t newSize, int32_t oldSize) noexcept = 0; + virtual void _resize(int32_t newSize, int32_t oldSize) noexcept {} /** * Fetches the queueing statistics corresponding to the 'admCtx'. All statistics that are queue @@ -221,8 +186,8 @@ private: void _updatePeakUsed(); - Mutex _resizeMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2), - "TicketHolderWithQueueingStats::_resizeMutex"); + Mutex _resizeMutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2), "TicketHolder::_resizeMutex"); AtomicWord<int32_t> _outof; AtomicWord<int32_t> _peakUsed; AtomicWord<std::int64_t> _immediatePriorityAdmissionsCount; @@ -238,6 +203,8 @@ protected: class MockTicketHolder : public TicketHolder { public: + MockTicketHolder() : TicketHolder(0, nullptr) {} + void resize(int32_t newSize) noexcept override { _outof = newSize; } @@ -274,15 +241,32 @@ public: _outof = outof; } + int64_t queued() const override { + return 0; + } + int64_t numFinishedProcessing() const override { return _numFinishedProcessing; } + void setNumFinishedProcessing(int32_t numFinishedProcessing) { _numFinishedProcessing = numFinishedProcessing; } private: - void _releaseToTicketPool(AdmissionContext*) noexcept override {} + void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override {} + + boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override; + + boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until) override; + + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override { + return _stats; + } + + QueueStats _stats; int32_t _used = 0; int32_t _peakUsed = 0; @@ -296,7 +280,6 @@ private: */ class Ticket { friend class TicketHolder; - friend class TicketHolderWithQueueingStats; friend class SemaphoreTicketHolder; friend class PriorityTicketHolder; friend class MockTicketHolder; diff --git a/src/mongo/util/concurrency/ticketholder_bm.cpp b/src/mongo/util/concurrency/ticketholder_bm.cpp index 04c5277a0a9..a3f1c767bdc 100644 --- a/src/mongo/util/concurrency/ticketholder_bm.cpp +++ b/src/mongo/util/concurrency/ticketholder_bm.cpp @@ -45,7 +45,7 @@ namespace mongo { namespace { -static int kTickets = 128; +static int kTickets = 32; static int kThreadMin = 16; static int kThreadMax = 1024; static int kLowPriorityAdmissionBypassThreshold = 100; @@ -183,6 +183,7 @@ void BM_acquireAndRelease(benchmark::State& state) { BENCHMARK_TEMPLATE(BM_acquireAndRelease, SemaphoreTicketHolder, AdmissionsPriority::kNormal) ->Threads(kThreadMin) ->Threads(kTickets) + ->Threads(128) ->Threads(kThreadMax); // TODO SERVER-72616: Remove ifdefs once PriorityTicketHolder is available cross-platform. @@ -191,6 +192,7 @@ BENCHMARK_TEMPLATE(BM_acquireAndRelease, SemaphoreTicketHolder, AdmissionsPriori BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriority::kNormal) ->Threads(kThreadMin) ->Threads(kTickets) + ->Threads(128) ->Threads(kThreadMax); // Low priority operations are expected to take longer to acquire a ticket because they are forced @@ -198,6 +200,7 @@ BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriorit BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriority::kLow) ->Threads(kThreadMin) ->Threads(kTickets) + ->Threads(128) ->Threads(kThreadMax); // This benchmark is intended for comparisons between different iterations of the @@ -209,6 +212,7 @@ BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriorit BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriority::kNormalAndLow) ->Threads(kThreadMin) ->Threads(kTickets) + ->Threads(128) ->Threads(kThreadMax); #endif diff --git a/src/mongo/util/concurrency/ticketholder_test_fixture.cpp b/src/mongo/util/concurrency/ticketholder_test_fixture.cpp index 539c9666fae..35057522e11 100644 --- a/src/mongo/util/concurrency/ticketholder_test_fixture.cpp +++ b/src/mongo/util/concurrency/ticketholder_test_fixture.cpp @@ -42,7 +42,7 @@ void TicketHolderTestFixture::setUp() { } void TicketHolderTestFixture::basicTimeout(OperationContext* opCtx, - std::unique_ptr<TicketHolderWithQueueingStats> holder) { + std::unique_ptr<TicketHolder> holder) { ASSERT_EQ(holder->used(), 0); ASSERT_EQ(holder->available(), 1); ASSERT_EQ(holder->outof(), 1); @@ -68,7 +68,7 @@ void TicketHolderTestFixture::basicTimeout(OperationContext* opCtx, } void TicketHolderTestFixture::resizeTest(OperationContext* opCtx, - std::unique_ptr<TicketHolderWithQueueingStats> holder, + std::unique_ptr<TicketHolder> holder, TickSourceMock<Microseconds>* tickSource) { Stats stats(holder.get()); @@ -133,4 +133,25 @@ void TicketHolderTestFixture::resizeTest(OperationContext* opCtx, ASSERT_FALSE(holder->waitForTicketUntil(opCtx, &admCtx, Date_t::now() + Milliseconds(1))); } +void TicketHolderTestFixture::interruptTest(OperationContext* opCtx, + std::unique_ptr<TicketHolder> holder) { + holder->resize(0); + + auto waiter = stdx::thread([&]() { + AdmissionContext admCtx; + ASSERT_THROWS_CODE(holder->waitForTicketUntil(opCtx, &admCtx, Date_t::max()), + DBException, + ErrorCodes::Interrupted); + }); + + while (!holder->queued()) { + } + + ASSERT_EQ(holder->used(), 0); + ASSERT_EQ(holder->available(), 0); + + opCtx->markKilled(); + waiter.join(); +} + } // namespace mongo diff --git a/src/mongo/util/concurrency/ticketholder_test_fixture.h b/src/mongo/util/concurrency/ticketholder_test_fixture.h index 096a6a64da7..b358665918a 100644 --- a/src/mongo/util/concurrency/ticketholder_test_fixture.h +++ b/src/mongo/util/concurrency/ticketholder_test_fixture.h @@ -47,23 +47,28 @@ namespace mongo { * target the legacy SemaphoreTicketHolder. */ class TicketHolderTestFixture : public ServiceContextTest { +public: void setUp() override; protected: class Stats; struct MockAdmission; - void basicTimeout(OperationContext* opCtx, - std::unique_ptr<TicketHolderWithQueueingStats> holder); + void basicTimeout(OperationContext* opCtx, std::unique_ptr<TicketHolder> holder); /** * Tests that TicketHolder::resize() does not impact metrics outside of those related to the * number of tickets available(), used(), and outof(). */ void resizeTest(OperationContext* opCtx, - std::unique_ptr<TicketHolderWithQueueingStats> holder, + std::unique_ptr<TicketHolder> holder, TickSourceMock<Microseconds>* tickSource); + /** + * Tests that ticket acquisition is interruptible. + */ + void interruptTest(OperationContext* opCtx, std::unique_ptr<TicketHolder> holder); + ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; }; |