diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-11-08 10:37:02 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-08 11:10:15 +0000 |
commit | 25329194fe4343fd4ef1f3423377da742a47d5d6 (patch) | |
tree | a80644d0d7faf7172fb64c7dfa850dfe4728249b /src/mongo/util | |
parent | 5e1d12241178f6c6520d57476552b362b5ddf237 (diff) | |
download | mongo-25329194fe4343fd4ef1f3423377da742a47d5d6.tar.gz |
SERVER-70927 Make PriorityTicketHolder select where to dequeue
Diffstat (limited to 'src/mongo/util')
-rw-r--r-- | src/mongo/util/concurrency/SConscript | 9 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 145 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 71 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_bm.cpp | 10 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_params.idl | 44 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_test.cpp | 100 |
6 files changed, 179 insertions, 200 deletions
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript index 51be550e81d..dfcc09d89d5 100644 --- a/src/mongo/util/concurrency/SConscript +++ b/src/mongo/util/concurrency/SConscript @@ -23,14 +23,6 @@ env.Library( ) env.Library( - target='ticketholder_params', - source=['ticketholder_params.idl'], - LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/server_base', - ], -) - -env.Library( target='ticketholder', source=['ticketholder.cpp'], LIBDEPS_PRIVATE=[ @@ -38,7 +30,6 @@ env.Library( '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/third_party/shim_boost', 'admission_context', - 'ticketholder_params', ], ) diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index 2e9e90eeec1..235ac15fe70 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -33,7 +33,6 @@ #include "mongo/db/service_context.h" #include "mongo/util/concurrency/admission_context.h" #include "mongo/util/concurrency/ticketholder.h" -#include "mongo/util/concurrency/ticketholder_params_gen.h" #include <iostream> @@ -379,13 +378,15 @@ void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { } #endif -PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext) +PriorityTicketHolder::PriorityTicketHolder(int numTickets, + int lowPriorityBypassThreshold, + ServiceContext* serviceContext) : TicketHolderWithQueueingStats(numTickets, serviceContext), _queues{Queue(this, QueueType::kLowPriority), Queue(this, QueueType::kNormalPriority), Queue(this, QueueType::kImmediatePriority)}, + _lowPriorityBypassThreshold(lowPriorityBypassThreshold), _serviceContext(serviceContext) { - _ticketsAvailable.store(numTickets); _enqueuedElements.store(0); } @@ -400,8 +401,14 @@ int PriorityTicketHolder::queued() const { return _enqueuedElements.loadRelaxed(); } -std::int64_t PriorityTicketHolder::promoted() const { - return _promotedElements.loadRelaxed(); +void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold( + const int& newBypassThreshold) { + UniqueLockGuard uniqueQueueLock(_queueMutex); + _lowPriorityBypassThreshold = newBypassThreshold; +} + +std::int64_t PriorityTicketHolder::expedited() const { + return _expeditedLowPriorityAdmissions.loadRelaxed(); } std::int64_t PriorityTicketHolder::bypassed() const { @@ -423,14 +430,9 @@ void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) no // // Under this lock the queues cannot be modified in terms of someone attempting to enqueue on // them, only waking threads is allowed. - ReleaserLockGuard releaserLock(_queueMutex); // NOLINT + SharedLockGuard sharedQueueLock(_queueMutex); _ticketsAvailable.addAndFetch(1); - if (std::all_of(_queues.begin(), _queues.end(), [](const Queue& queue) { - return queue.queuedElems() == 0; - })) { - return; - } - _dequeueWaitingThread(releaserLock); + _dequeueWaitingThread(sharedQueueLock); } bool PriorityTicketHolder::_tryAcquireTicket() { @@ -465,33 +467,24 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC auto queueType = _queueType(admCtx); auto& queue = _getQueue(queueType); - AdmissionStatus admissionStatus; + bool assigned; { - EnqueuerLockGuard enqueuerLock(_queueMutex); + UniqueLockGuard uniqueQueueLock(_queueMutex); _enqueuedElements.addAndFetch(1); ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); }); - admissionStatus = queue.enqueue(opCtx, enqueuerLock, until, waitMode); - - if (admissionStatus == AdmissionStatus::kNeedsPromotion) { - invariant(queueType == QueueType::kLowPriority); - - _promotedElements.fetchAndAdd(1); - auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority); - admissionStatus = normalPriorityQueue.enqueue(opCtx, enqueuerLock, until, waitMode); - } + assigned = queue.enqueue(opCtx, uniqueQueueLock, until, waitMode); } - if (admissionStatus == AdmissionStatus::kReadyToAcquire) { + if (assigned) { return Ticket{this, admCtx}; } else { return boost::none; } } -bool PriorityTicketHolder::_hasToWaitForHigherPriority(const EnqueuerLockGuard& lk, - QueueType queue) { +bool PriorityTicketHolder::_hasToWaitForHigherPriority(const UniqueLockGuard& lk, QueueType queue) { switch (queue) { case QueueType::kLowPriority: { const auto& normalQueue = _getQueue(QueueType::kNormalPriority); @@ -511,9 +504,9 @@ void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept { if (difference > 0) { // As we're adding tickets the waiting threads need to be notified that there are new // tickets available. - ReleaserLockGuard releaserLock(_queueMutex); + SharedLockGuard sharedQueueLock(_queueMutex); for (int i = 0; i < difference; i++) { - _dequeueWaitingThread(releaserLock); + _dequeueWaitingThread(sharedQueueLock); } } @@ -521,7 +514,7 @@ void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept { // have to wait until the current ticket holders release their tickets. } -bool PriorityTicketHolder::Queue::attemptToDequeue(const ReleaserLockGuard& releaserLock) { +bool PriorityTicketHolder::Queue::attemptToDequeue(const SharedLockGuard& sharedQueueLock) { auto threadsToBeWoken = _threadsToBeWoken.load(); while (threadsToBeWoken < _queuedThreads) { auto canDequeue = _threadsToBeWoken.compareAndSwap(&threadsToBeWoken, threadsToBeWoken + 1); @@ -533,7 +526,7 @@ bool PriorityTicketHolder::Queue::attemptToDequeue(const ReleaserLockGuard& rele return false; } -void PriorityTicketHolder::Queue::_signalThreadWoken(const EnqueuerLockGuard& enqueuerLock) { +void PriorityTicketHolder::Queue::_signalThreadWoken(const UniqueLockGuard& uniqueQueueLock) { auto currentThreadsToBeWoken = _threadsToBeWoken.load(); while (currentThreadsToBeWoken > 0) { if (_threadsToBeWoken.compareAndSwap(¤tThreadsToBeWoken, @@ -543,11 +536,10 @@ void PriorityTicketHolder::Queue::_signalThreadWoken(const EnqueuerLockGuard& en } } -PriorityTicketHolder::AdmissionStatus PriorityTicketHolder::Queue::enqueue( - OperationContext* opCtx, - EnqueuerLockGuard& enqueuerLock, - const Date_t& until, - WaitMode waitMode) { +bool PriorityTicketHolder::Queue::enqueue(OperationContext* opCtx, + UniqueLockGuard& uniqueQueueLock, + const Date_t& until, + WaitMode waitMode) { _queuedThreads++; // Before exiting we remove ourselves from the count of queued threads, we are still holding the // lock here so this is safe. @@ -568,70 +560,61 @@ PriorityTicketHolder::AdmissionStatus PriorityTicketHolder::Queue::enqueue( // woken after the condition variable wait, not before which is where the predicate would // go. while (_holder->_ticketsAvailable.load() <= 0 || - _holder->_hasToWaitForHigherPriority(enqueuerLock, _queueType)) { + _holder->_hasToWaitForHigherPriority(uniqueQueueLock, _queueType)) { // This method must be called after getting woken in all cases, so we use a ScopeGuard // to handle exceptions as well as early returns. - ON_BLOCK_EXIT([&] { _signalThreadWoken(enqueuerLock); }); + ON_BLOCK_EXIT([&] { _signalThreadWoken(uniqueQueueLock); }); auto waitResult = - clockSource->waitForConditionUntil(_cv, enqueuerLock, deadline, baton); + clockSource->waitForConditionUntil(_cv, uniqueQueueLock, deadline, baton); // We check if the operation has been interrupted (timeout, killed, etc.) here. if (waitMode == WaitMode::kInterruptible) { opCtx->checkForInterrupt(); } if (waitResult == stdx::cv_status::timeout) - return AdmissionStatus::kInterrupted; - - bool needsPromotion = _signalPromotion.swap(false); - if (needsPromotion) { - // The thread is woken for promotion, which is different than a thread woken to - // acquire a ticket / a product of an interruption. - return PriorityTicketHolder::AdmissionStatus::kNeedsPromotion; - } + return false; } } while (!_holder->_tryAcquireTicket()); - return AdmissionStatus::kReadyToAcquire; -} - -void PriorityTicketHolder::Queue::signalPromoteSingleOp(const ReleaserLockGuard& releaserLock) { - // Only signal a promotion if there exists an operation to promote. - // - // Additionally, only modify _signalPromotion if it was false to begin. This prevents the - // following race (as multiple releasers may call this code concurrently): - // . ReleaserA attempts to dequeue, succeeds, _signalPromotion is set to true - // . ReleaserB attempts to dequeue, fails, _signalPromotion is set to false before the - // woken thread has a chance to check whether it should be promoted. - // - // - bool permittedOriginalValue = false; - _signalPromotion.compareAndSwap(&permittedOriginalValue, attemptToDequeue(releaserLock)); + return true; } -void PriorityTicketHolder::_dequeueWaitingThread(const ReleaserLockGuard& releaserLock) { - // There should never be anything to dequeue from 'QueueType::kImmediatePriority' since - // 'kImmediate' operations should always bypass the need to queue. +void PriorityTicketHolder::_dequeueWaitingThread(const SharedLockGuard& sharedQueueLock) { + // There are only 2 possible queues to dequeue from - the low priority and normal priority + // queues. There will never be anything to dequeue from the immediate priority queue, given + // immediate priority operations will never wait for ticket admission. + auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority); auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority); - if (!normalPriorityQueue.attemptToDequeue(releaserLock)) { - _getQueue(QueueType::kLowPriority).attemptToDequeue(releaserLock); + + // There is a guarantee that the number of queued elements cannot change while holding the + // shared queue lock. + auto lowQueueCount = lowPriorityQueue.queuedElems(); + auto normalQueueCount = normalPriorityQueue.queuedElems(); + + if (lowQueueCount == 0 && normalQueueCount == 0) { return; } - - auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority); - if (lowPriorityQueue.queuedElems() == 0) { - // Dequeueing from the normal queue didn't bypass any operations waiting in the low priority - // queue, return early. + if (lowQueueCount == 0) { + normalPriorityQueue.attemptToDequeue(sharedQueueLock); + return; + } + if (normalQueueCount == 0) { + lowPriorityQueue.attemptToDequeue(sharedQueueLock); return; } - // To prevent starvation, record the number of times operations in the low priority queue are - // bypassed in favor of dequeueing from the normal queue. If operations are bypassed enough, it - // may be time to promote a low priority operation to give it a chance to eventually run. - // - // A value of 0 implies low priority operations are never to be promoted. - if (auto lowPriorityOperationPromotionRate = gLowPriorityOperationPromotionRate.load(); - lowPriorityOperationPromotionRate > 0) { - if (_lowPriorityBypassCount.addAndFetch(1) % lowPriorityOperationPromotionRate == 0) { - lowPriorityQueue.signalPromoteSingleOp(releaserLock); + // Both queues are non-empty, and the low priority queue is bypassed for dequeue in favor of the + // normal priority queue until the bypass threshold is met. + if (_lowPriorityBypassThreshold > 0 && + _lowPriorityBypassCount.addAndFetch(1) % _lowPriorityBypassThreshold == 0) { + if (lowPriorityQueue.attemptToDequeue(sharedQueueLock)) { + _expeditedLowPriorityAdmissions.addAndFetch(1); + } else { + normalPriorityQueue.attemptToDequeue(sharedQueueLock); } + return; + } + + if (!normalPriorityQueue.attemptToDequeue(sharedQueueLock)) { + lowPriorityQueue.attemptToDequeue(sharedQueueLock); } } @@ -640,7 +623,7 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { BSONObjBuilder bbb(b.subobjStart("lowPriority")); const auto& lowPriorityTicketStats = _getQueue(QueueType::kLowPriority).getStatsToUse(); appendCommonQueueImplStats(bbb, lowPriorityTicketStats); - bbb.append("promoted", promoted()); + bbb.append("expedited", expedited()); bbb.append("bypassCount", bypassed()); bbb.done(); } diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 834aa28f1fe..3b0153d1a85 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -284,7 +284,9 @@ private: class PriorityTicketHolder : public TicketHolderWithQueueingStats { protected: public: - explicit PriorityTicketHolder(int numTickets, ServiceContext* serviceContext); + explicit PriorityTicketHolder(int numTickets, + int lowPriorityBypassThreshold, + ServiceContext* serviceContext); ~PriorityTicketHolder() override; int available() const override final; @@ -295,7 +297,13 @@ public: return true; }; - std::int64_t promoted() const; + void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold); + + /** + * Number of times low priority operations are expedited for ticket admission over normal + * priority operations. + */ + std::int64_t expedited() const; /** * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the @@ -314,9 +322,9 @@ private: // // The alternative of using ResourceMutex is not appropriate as the class serves as a // concurrency primitive and is performance sensitive. - using QueueMutex = std::shared_mutex; // NOLINT - using ReleaserLockGuard = std::shared_lock<QueueMutex>; // NOLINT - using EnqueuerLockGuard = std::unique_lock<QueueMutex>; // NOLINT + using QueueMutex = std::shared_mutex; // NOLINT + using SharedLockGuard = std::shared_lock<QueueMutex>; // NOLINT + using UniqueLockGuard = std::unique_lock<QueueMutex>; // NOLINT enum class QueueType : unsigned int { kLowPriority = 0, @@ -327,39 +335,27 @@ private: QueueTypeSize = 3 }; - enum class AdmissionStatus { - // The ticket request is successful, and the operation is ready to acquire a ticket. - kReadyToAcquire, - // There are no tickets available for the request at its given priority. The priority should - // be increased to allow for ticket acquisition. - kNeedsPromotion, - // An interruption occured while attempting to acquire a ticket. - kInterrupted, - }; - class Queue { public: Queue(PriorityTicketHolder* holder, QueueType queueType) : _holder(holder), _queueType(queueType){}; - bool attemptToDequeue(const ReleaserLockGuard& releaserLock); + bool attemptToDequeue(const SharedLockGuard& sharedQueueLock); - AdmissionStatus enqueue(OperationContext* interruptible, - EnqueuerLockGuard& queueLock, - const Date_t& until, - WaitMode waitMode); + /** + * Returns true if this operation is assigned a ticket, false if the deadline is exceeded + * before ticket acquisition. Throws if the operation is interrupted. + */ + bool enqueue(OperationContext* interruptible, + UniqueLockGuard& queueLock, + const Date_t& until, + WaitMode waitMode); int queuedElems() const { return _queuedThreads; } /** - * Signals that a queued thread should be woken and exit the queue so it can be promoted to - * a higher priority queue. - */ - void signalPromoteSingleOp(const ReleaserLockGuard& releaserLock); - - /** * Returns a reference to the Queue statistics that allows callers to update the statistics. */ QueueStats& getStatsToUse() { @@ -374,11 +370,10 @@ private: } private: - void _signalThreadWoken(const EnqueuerLockGuard& enqueuerLock); + void _signalThreadWoken(const UniqueLockGuard& uniqueQueueLock); int _queuedThreads{0}; AtomicWord<int> _threadsToBeWoken{0}; - AtomicWord<bool> _signalPromotion{false}; stdx::condition_variable _cv; PriorityTicketHolder* _holder; @@ -416,13 +411,13 @@ private: * - The number of items in each queue will not change during the execution * - No other thread will proceed to wait during the execution of the method */ - void _dequeueWaitingThread(const ReleaserLockGuard& releaserLock); + void _dequeueWaitingThread(const SharedLockGuard& sharedQueueLock); /** * Returns whether there are higher priority threads pending to get a ticket in front of the * given queue type and not enough tickets for all of them. */ - bool _hasToWaitForHigherPriority(const EnqueuerLockGuard& lk, QueueType queue); + bool _hasToWaitForHigherPriority(const UniqueLockGuard& lk, QueueType queue); QueueType _queueType(const AdmissionContext* admCtx); @@ -434,13 +429,25 @@ private: QueueMutex _queueMutex; /** + * Limits the number times the low priority queue is non-empty and bypassed in favor of the + * normal priority queue for the next ticket admission. + * + * Updates must be done under the UniqueLockGuard. + */ + int _lowPriorityBypassThreshold; + + /** * Counts the number of times normal operations are dequeued over operations queued in the low * priority queue. */ - AtomicWord<std::int64_t> _lowPriorityBypassCount{0}; + AtomicWord<std::uint64_t> _lowPriorityBypassCount{0}; + + /** + * Number of times ticket admission is expedited for low priority operations. + */ + AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0}; AtomicWord<int> _ticketsAvailable; AtomicWord<int> _enqueuedElements; - AtomicWord<std::int64_t> _promotedElements{0}; ServiceContext* _serviceContext; }; diff --git a/src/mongo/util/concurrency/ticketholder_bm.cpp b/src/mongo/util/concurrency/ticketholder_bm.cpp index e9185b7188f..19b75c18bc3 100644 --- a/src/mongo/util/concurrency/ticketholder_bm.cpp +++ b/src/mongo/util/concurrency/ticketholder_bm.cpp @@ -44,6 +44,7 @@ namespace { static int kTickets = 128; static int kThreadMin = 16; static int kThreadMax = 1024; +static int kLowPriorityAdmissionBypassThreshold = 100; static TicketHolder::WaitMode waitMode = TicketHolder::WaitMode::kUninterruptible; template <typename TicketHolderImpl> @@ -55,7 +56,13 @@ public: TicketHolderFixture(int threads, ServiceContext* serviceContext) { - ticketHolder = std::make_unique<TicketHolderImpl>(kTickets, serviceContext); + if constexpr (std::is_same_v<PriorityTicketHolder, TicketHolderImpl>) { + ticketHolder = std::make_unique<TicketHolderImpl>( + kTickets, kLowPriorityAdmissionBypassThreshold, serviceContext); + } else { + ticketHolder = std::make_unique<TicketHolderImpl>(kTickets, serviceContext); + } + for (int i = 0; i < threads; ++i) { clients.push_back( serviceContext->makeClient(str::stream() << "test client for thread " << i)); @@ -64,6 +71,7 @@ public: } }; + static Mutex isReadyMutex; static stdx::condition_variable isReadyCv; static bool isReady = false; diff --git a/src/mongo/util/concurrency/ticketholder_params.idl b/src/mongo/util/concurrency/ticketholder_params.idl deleted file mode 100644 index 770fb7d4ee3..00000000000 --- a/src/mongo/util/concurrency/ticketholder_params.idl +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (C) 2022-present MongoDB, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the Server Side Public License, version 1, -# as published by MongoDB, Inc. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Server Side Public License for more details. -# -# You should have received a copy of the Server Side Public License -# along with this program. If not, see -# <http://www.mongodb.com/licensing/server-side-public-license>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. -# - -global: - cpp_namespace: "mongo" - -server_parameters: - - lowPriorityOperationPromotionRate: - description: "When Deprioritization is enabled, specifies the rate at which a low priority -operation will be promoted after being bypassed for normal priority operations n times." - set_at: [ startup, runtime ] - cpp_vartype: AtomicWord<int> - cpp_varname: gLowPriorityOperationPromotionRate - # 0 means low priority operations will never promoted. - default: 20000 - validator: - gte: 0 - diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp index d47faa30093..3c7f8b1803c 100644 --- a/src/mongo/util/concurrency/ticketholder_test.cpp +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -49,6 +49,10 @@ namespace { using namespace mongo; +// By default, tests will create a PriorityTicketHolder where low priority admissions can be +// bypassed an unlimited amount of times in favor of normal priority admissions. +static constexpr int kDefaultLowPriorityAdmissionBypassThreshold = 0; + class TicketHolderTest : public ServiceContextTest { void setUp() override { ServiceContextTest::setUp(); @@ -84,12 +88,8 @@ void assertSoon(std::function<void()> predicate, Milliseconds timeout = kWaitTim } } -template <class H> -void basicTimeout(OperationContext* opCtx) { - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); +void basicTimeout(OperationContext* opCtx, std::unique_ptr<TicketHolderWithQueueingStats> holder) { auto mode = TicketHolder::WaitMode::kInterruptible; - std::unique_ptr<TicketHolderWithQueueingStats> holder = std::make_unique<H>(1, &serviceContext); ASSERT_EQ(holder->used(), 0); ASSERT_EQ(holder->available(), 1); ASSERT_EQ(holder->outof(), 1); @@ -159,10 +159,17 @@ void basicTimeout(OperationContext* opCtx) { } TEST_F(TicketHolderTest, BasicTimeoutSemaphore) { - basicTimeout<SemaphoreTicketHolder>(_opCtx.get()); + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + basicTimeout(_opCtx.get(), std::make_unique<SemaphoreTicketHolder>(1, &serviceContext)); } + TEST_F(TicketHolderTest, BasicTimeoutPriority) { - basicTimeout<PriorityTicketHolder>(_opCtx.get()); + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + basicTimeout(_opCtx.get(), + std::make_unique<PriorityTicketHolder>( + 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext)); } class Stats { @@ -204,15 +211,12 @@ struct MockAdmission { boost::optional<Ticket> ticket; }; -template <class H> -void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperation = false) { - // Verify that resize operations don't alter metrics outside of those linked to the number of - // tickets. - ServiceContext serviceContext; - serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); - auto mode = TicketHolder::WaitMode::kInterruptible; - std::unique_ptr<TicketHolderWithQueueingStats> holder = std::make_unique<H>(1, &serviceContext); +// Verify that resize operations don't alter metrics outside of those linked to the number of +// tickets. +void resizeTest(OperationContext* opCtx, + std::unique_ptr<TicketHolderWithQueueingStats> holder, + TickSourceMock<Microseconds>* tickSource, + bool testWithOutstandingImmediateOperation = false) { Stats stats(holder.get()); // An outstanding kImmediate priority operation should not impact resize statistics. @@ -226,6 +230,7 @@ void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperat AdmissionContext admCtx; admCtx.setPriority(AdmissionContext::Priority::kNormal); + auto mode = TicketHolder::WaitMode::kInterruptible; auto ticket = holder->waitForTicketUntil(opCtx, &admCtx, Date_t::now() + Milliseconds{500}, mode); @@ -266,22 +271,53 @@ void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperat } TEST_F(TicketHolderTest, ResizeStatsSemaphore) { - resizeTest<SemaphoreTicketHolder>(_opCtx.get()); + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); + + resizeTest( + _opCtx.get(), std::make_unique<SemaphoreTicketHolder>(1, &serviceContext), tickSource); } + TEST_F(TicketHolderTest, ResizeStatsPriority) { - resizeTest<PriorityTicketHolder>(_opCtx.get()); + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); + + resizeTest(_opCtx.get(), + std::make_unique<PriorityTicketHolder>( + 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext), + tickSource); } + TEST_F(TicketHolderTest, ResizeStatsSemaphoreWithOutstandingImmediatePriority) { - resizeTest<SemaphoreTicketHolder>(_opCtx.get(), true); + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); + + resizeTest(_opCtx.get(), + std::make_unique<SemaphoreTicketHolder>(1, &serviceContext), + tickSource, + true); } + TEST_F(TicketHolderTest, ResizeStatsPriorityWithOutstandingImmediatePriority) { - resizeTest<PriorityTicketHolder>(_opCtx.get(), true); + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); + + resizeTest(_opCtx.get(), + std::make_unique<PriorityTicketHolder>( + 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext), + tickSource, + true); } TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) { ServiceContext serviceContext; serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - PriorityTicketHolder holder(1, &serviceContext); + PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); + Stats stats(&holder); { @@ -366,7 +402,7 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) { TEST_F(TicketHolderTest, OnlyLowPriorityOps) { ServiceContext serviceContext; serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - PriorityTicketHolder holder(1, &serviceContext); + PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); Stats stats(&holder); { @@ -488,7 +524,7 @@ TEST_F(TicketHolderTest, OnlyLowPriorityOps) { TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { ServiceContext serviceContext; serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - PriorityTicketHolder holder(1, &serviceContext); + PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); Stats stats(&holder); { @@ -592,7 +628,7 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) { ServiceContext serviceContext; serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); - PriorityTicketHolder holder(1, &serviceContext); + PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); Stats stats(&holder); MockAdmission lowPriorityAdmission(this->getServiceContext(), AdmissionContext::Priority::kLow); @@ -697,7 +733,7 @@ TEST_F(TicketHolderTest, PrioritImmediateMetrics) { ServiceContext serviceContext; serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); - PriorityTicketHolder holder(1, &serviceContext); + PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); Stats stats(&holder); MockAdmission lowPriorityAdmission(this->getServiceContext(), AdmissionContext::Priority::kLow); @@ -780,7 +816,7 @@ TEST_F(TicketHolderTest, PriorityCanceled) { ServiceContext serviceContext; serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); - PriorityTicketHolder holder(1, &serviceContext); + PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext); Stats stats(&holder); { MockAdmission lowPriorityAdmission(this->getServiceContext(), @@ -847,13 +883,11 @@ TEST_F(TicketHolderTest, PriorityCanceled) { ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0); } -TEST_F(TicketHolderTest, PriorityPromotionBasic) { +TEST_F(TicketHolderTest, LowPriorityExpedited) { ServiceContext serviceContext; serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); - auto lowPriorityPromotionRate = 2; - RAIIServerParameterControllerForTest _promotionController("lowPriorityOperationPromotionRate", - lowPriorityPromotionRate); - PriorityTicketHolder holder(1, &serviceContext); + auto lowPriorityBypassThreshold = 2; + PriorityTicketHolder holder(1, lowPriorityBypassThreshold, &serviceContext); Stats stats(&holder); // Use the GlobalServiceContext to create MockAdmissions. @@ -893,7 +927,7 @@ TEST_F(TicketHolderTest, PriorityPromotionBasic) { initialAdmission.ticket.reset(); assertSoon([&] { - ASSERT_EQ(holder.promoted(), 1); + ASSERT_EQ(holder.expedited(), 1); ASSERT(lowPriorityAdmission.ticket); }); @@ -919,7 +953,7 @@ TEST_F(TicketHolderTest, PriorityPromotionBasic) { ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0); ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1); ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0); - ASSERT_EQ(lowPriorityStats.getIntField("promoted"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("expedited"), 1); auto normalPriorityStats = currentStats.getObjectField("normalPriority"); ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), queuedNormalAdmissionsCount); |