summaryrefslogtreecommitdiff
path: root/src/mongo/util
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-11-08 10:37:02 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-08 11:10:15 +0000
commit25329194fe4343fd4ef1f3423377da742a47d5d6 (patch)
treea80644d0d7faf7172fb64c7dfa850dfe4728249b /src/mongo/util
parent5e1d12241178f6c6520d57476552b362b5ddf237 (diff)
downloadmongo-25329194fe4343fd4ef1f3423377da742a47d5d6.tar.gz
SERVER-70927 Make PriorityTicketHolder select where to dequeue
Diffstat (limited to 'src/mongo/util')
-rw-r--r--src/mongo/util/concurrency/SConscript9
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp145
-rw-r--r--src/mongo/util/concurrency/ticketholder.h71
-rw-r--r--src/mongo/util/concurrency/ticketholder_bm.cpp10
-rw-r--r--src/mongo/util/concurrency/ticketholder_params.idl44
-rw-r--r--src/mongo/util/concurrency/ticketholder_test.cpp100
6 files changed, 179 insertions, 200 deletions
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index 51be550e81d..dfcc09d89d5 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -23,14 +23,6 @@ env.Library(
)
env.Library(
- target='ticketholder_params',
- source=['ticketholder_params.idl'],
- LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/db/server_base',
- ],
-)
-
-env.Library(
target='ticketholder',
source=['ticketholder.cpp'],
LIBDEPS_PRIVATE=[
@@ -38,7 +30,6 @@ env.Library(
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/third_party/shim_boost',
'admission_context',
- 'ticketholder_params',
],
)
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index 2e9e90eeec1..235ac15fe70 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/service_context.h"
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/ticketholder.h"
-#include "mongo/util/concurrency/ticketholder_params_gen.h"
#include <iostream>
@@ -379,13 +378,15 @@ void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept {
}
#endif
-PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext)
+PriorityTicketHolder::PriorityTicketHolder(int numTickets,
+ int lowPriorityBypassThreshold,
+ ServiceContext* serviceContext)
: TicketHolderWithQueueingStats(numTickets, serviceContext),
_queues{Queue(this, QueueType::kLowPriority),
Queue(this, QueueType::kNormalPriority),
Queue(this, QueueType::kImmediatePriority)},
+ _lowPriorityBypassThreshold(lowPriorityBypassThreshold),
_serviceContext(serviceContext) {
-
_ticketsAvailable.store(numTickets);
_enqueuedElements.store(0);
}
@@ -400,8 +401,14 @@ int PriorityTicketHolder::queued() const {
return _enqueuedElements.loadRelaxed();
}
-std::int64_t PriorityTicketHolder::promoted() const {
- return _promotedElements.loadRelaxed();
+void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold(
+ const int& newBypassThreshold) {
+ UniqueLockGuard uniqueQueueLock(_queueMutex);
+ _lowPriorityBypassThreshold = newBypassThreshold;
+}
+
+std::int64_t PriorityTicketHolder::expedited() const {
+ return _expeditedLowPriorityAdmissions.loadRelaxed();
}
std::int64_t PriorityTicketHolder::bypassed() const {
@@ -423,14 +430,9 @@ void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) no
//
// Under this lock the queues cannot be modified in terms of someone attempting to enqueue on
// them, only waking threads is allowed.
- ReleaserLockGuard releaserLock(_queueMutex); // NOLINT
+ SharedLockGuard sharedQueueLock(_queueMutex);
_ticketsAvailable.addAndFetch(1);
- if (std::all_of(_queues.begin(), _queues.end(), [](const Queue& queue) {
- return queue.queuedElems() == 0;
- })) {
- return;
- }
- _dequeueWaitingThread(releaserLock);
+ _dequeueWaitingThread(sharedQueueLock);
}
bool PriorityTicketHolder::_tryAcquireTicket() {
@@ -465,33 +467,24 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC
auto queueType = _queueType(admCtx);
auto& queue = _getQueue(queueType);
- AdmissionStatus admissionStatus;
+ bool assigned;
{
- EnqueuerLockGuard enqueuerLock(_queueMutex);
+ UniqueLockGuard uniqueQueueLock(_queueMutex);
_enqueuedElements.addAndFetch(1);
ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); });
- admissionStatus = queue.enqueue(opCtx, enqueuerLock, until, waitMode);
-
- if (admissionStatus == AdmissionStatus::kNeedsPromotion) {
- invariant(queueType == QueueType::kLowPriority);
-
- _promotedElements.fetchAndAdd(1);
- auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority);
- admissionStatus = normalPriorityQueue.enqueue(opCtx, enqueuerLock, until, waitMode);
- }
+ assigned = queue.enqueue(opCtx, uniqueQueueLock, until, waitMode);
}
- if (admissionStatus == AdmissionStatus::kReadyToAcquire) {
+ if (assigned) {
return Ticket{this, admCtx};
} else {
return boost::none;
}
}
-bool PriorityTicketHolder::_hasToWaitForHigherPriority(const EnqueuerLockGuard& lk,
- QueueType queue) {
+bool PriorityTicketHolder::_hasToWaitForHigherPriority(const UniqueLockGuard& lk, QueueType queue) {
switch (queue) {
case QueueType::kLowPriority: {
const auto& normalQueue = _getQueue(QueueType::kNormalPriority);
@@ -511,9 +504,9 @@ void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept {
if (difference > 0) {
// As we're adding tickets the waiting threads need to be notified that there are new
// tickets available.
- ReleaserLockGuard releaserLock(_queueMutex);
+ SharedLockGuard sharedQueueLock(_queueMutex);
for (int i = 0; i < difference; i++) {
- _dequeueWaitingThread(releaserLock);
+ _dequeueWaitingThread(sharedQueueLock);
}
}
@@ -521,7 +514,7 @@ void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept {
// have to wait until the current ticket holders release their tickets.
}
-bool PriorityTicketHolder::Queue::attemptToDequeue(const ReleaserLockGuard& releaserLock) {
+bool PriorityTicketHolder::Queue::attemptToDequeue(const SharedLockGuard& sharedQueueLock) {
auto threadsToBeWoken = _threadsToBeWoken.load();
while (threadsToBeWoken < _queuedThreads) {
auto canDequeue = _threadsToBeWoken.compareAndSwap(&threadsToBeWoken, threadsToBeWoken + 1);
@@ -533,7 +526,7 @@ bool PriorityTicketHolder::Queue::attemptToDequeue(const ReleaserLockGuard& rele
return false;
}
-void PriorityTicketHolder::Queue::_signalThreadWoken(const EnqueuerLockGuard& enqueuerLock) {
+void PriorityTicketHolder::Queue::_signalThreadWoken(const UniqueLockGuard& uniqueQueueLock) {
auto currentThreadsToBeWoken = _threadsToBeWoken.load();
while (currentThreadsToBeWoken > 0) {
if (_threadsToBeWoken.compareAndSwap(&currentThreadsToBeWoken,
@@ -543,11 +536,10 @@ void PriorityTicketHolder::Queue::_signalThreadWoken(const EnqueuerLockGuard& en
}
}
-PriorityTicketHolder::AdmissionStatus PriorityTicketHolder::Queue::enqueue(
- OperationContext* opCtx,
- EnqueuerLockGuard& enqueuerLock,
- const Date_t& until,
- WaitMode waitMode) {
+bool PriorityTicketHolder::Queue::enqueue(OperationContext* opCtx,
+ UniqueLockGuard& uniqueQueueLock,
+ const Date_t& until,
+ WaitMode waitMode) {
_queuedThreads++;
// Before exiting we remove ourselves from the count of queued threads, we are still holding the
// lock here so this is safe.
@@ -568,70 +560,61 @@ PriorityTicketHolder::AdmissionStatus PriorityTicketHolder::Queue::enqueue(
// woken after the condition variable wait, not before which is where the predicate would
// go.
while (_holder->_ticketsAvailable.load() <= 0 ||
- _holder->_hasToWaitForHigherPriority(enqueuerLock, _queueType)) {
+ _holder->_hasToWaitForHigherPriority(uniqueQueueLock, _queueType)) {
// This method must be called after getting woken in all cases, so we use a ScopeGuard
// to handle exceptions as well as early returns.
- ON_BLOCK_EXIT([&] { _signalThreadWoken(enqueuerLock); });
+ ON_BLOCK_EXIT([&] { _signalThreadWoken(uniqueQueueLock); });
auto waitResult =
- clockSource->waitForConditionUntil(_cv, enqueuerLock, deadline, baton);
+ clockSource->waitForConditionUntil(_cv, uniqueQueueLock, deadline, baton);
// We check if the operation has been interrupted (timeout, killed, etc.) here.
if (waitMode == WaitMode::kInterruptible) {
opCtx->checkForInterrupt();
}
if (waitResult == stdx::cv_status::timeout)
- return AdmissionStatus::kInterrupted;
-
- bool needsPromotion = _signalPromotion.swap(false);
- if (needsPromotion) {
- // The thread is woken for promotion, which is different than a thread woken to
- // acquire a ticket / a product of an interruption.
- return PriorityTicketHolder::AdmissionStatus::kNeedsPromotion;
- }
+ return false;
}
} while (!_holder->_tryAcquireTicket());
- return AdmissionStatus::kReadyToAcquire;
-}
-
-void PriorityTicketHolder::Queue::signalPromoteSingleOp(const ReleaserLockGuard& releaserLock) {
- // Only signal a promotion if there exists an operation to promote.
- //
- // Additionally, only modify _signalPromotion if it was false to begin. This prevents the
- // following race (as multiple releasers may call this code concurrently):
- // . ReleaserA attempts to dequeue, succeeds, _signalPromotion is set to true
- // . ReleaserB attempts to dequeue, fails, _signalPromotion is set to false before the
- // woken thread has a chance to check whether it should be promoted.
- //
- //
- bool permittedOriginalValue = false;
- _signalPromotion.compareAndSwap(&permittedOriginalValue, attemptToDequeue(releaserLock));
+ return true;
}
-void PriorityTicketHolder::_dequeueWaitingThread(const ReleaserLockGuard& releaserLock) {
- // There should never be anything to dequeue from 'QueueType::kImmediatePriority' since
- // 'kImmediate' operations should always bypass the need to queue.
+void PriorityTicketHolder::_dequeueWaitingThread(const SharedLockGuard& sharedQueueLock) {
+ // There are only 2 possible queues to dequeue from - the low priority and normal priority
+ // queues. There will never be anything to dequeue from the immediate priority queue, given
+ // immediate priority operations will never wait for ticket admission.
+ auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority);
auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority);
- if (!normalPriorityQueue.attemptToDequeue(releaserLock)) {
- _getQueue(QueueType::kLowPriority).attemptToDequeue(releaserLock);
+
+ // There is a guarantee that the number of queued elements cannot change while holding the
+ // shared queue lock.
+ auto lowQueueCount = lowPriorityQueue.queuedElems();
+ auto normalQueueCount = normalPriorityQueue.queuedElems();
+
+ if (lowQueueCount == 0 && normalQueueCount == 0) {
return;
}
-
- auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority);
- if (lowPriorityQueue.queuedElems() == 0) {
- // Dequeueing from the normal queue didn't bypass any operations waiting in the low priority
- // queue, return early.
+ if (lowQueueCount == 0) {
+ normalPriorityQueue.attemptToDequeue(sharedQueueLock);
+ return;
+ }
+ if (normalQueueCount == 0) {
+ lowPriorityQueue.attemptToDequeue(sharedQueueLock);
return;
}
- // To prevent starvation, record the number of times operations in the low priority queue are
- // bypassed in favor of dequeueing from the normal queue. If operations are bypassed enough, it
- // may be time to promote a low priority operation to give it a chance to eventually run.
- //
- // A value of 0 implies low priority operations are never to be promoted.
- if (auto lowPriorityOperationPromotionRate = gLowPriorityOperationPromotionRate.load();
- lowPriorityOperationPromotionRate > 0) {
- if (_lowPriorityBypassCount.addAndFetch(1) % lowPriorityOperationPromotionRate == 0) {
- lowPriorityQueue.signalPromoteSingleOp(releaserLock);
+ // Both queues are non-empty, and the low priority queue is bypassed for dequeue in favor of the
+ // normal priority queue until the bypass threshold is met.
+ if (_lowPriorityBypassThreshold > 0 &&
+ _lowPriorityBypassCount.addAndFetch(1) % _lowPriorityBypassThreshold == 0) {
+ if (lowPriorityQueue.attemptToDequeue(sharedQueueLock)) {
+ _expeditedLowPriorityAdmissions.addAndFetch(1);
+ } else {
+ normalPriorityQueue.attemptToDequeue(sharedQueueLock);
}
+ return;
+ }
+
+ if (!normalPriorityQueue.attemptToDequeue(sharedQueueLock)) {
+ lowPriorityQueue.attemptToDequeue(sharedQueueLock);
}
}
@@ -640,7 +623,7 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
BSONObjBuilder bbb(b.subobjStart("lowPriority"));
const auto& lowPriorityTicketStats = _getQueue(QueueType::kLowPriority).getStatsToUse();
appendCommonQueueImplStats(bbb, lowPriorityTicketStats);
- bbb.append("promoted", promoted());
+ bbb.append("expedited", expedited());
bbb.append("bypassCount", bypassed());
bbb.done();
}
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 834aa28f1fe..3b0153d1a85 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -284,7 +284,9 @@ private:
class PriorityTicketHolder : public TicketHolderWithQueueingStats {
protected:
public:
- explicit PriorityTicketHolder(int numTickets, ServiceContext* serviceContext);
+ explicit PriorityTicketHolder(int numTickets,
+ int lowPriorityBypassThreshold,
+ ServiceContext* serviceContext);
~PriorityTicketHolder() override;
int available() const override final;
@@ -295,7 +297,13 @@ public:
return true;
};
- std::int64_t promoted() const;
+ void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold);
+
+ /**
+ * Number of times low priority operations are expedited for ticket admission over normal
+ * priority operations.
+ */
+ std::int64_t expedited() const;
/**
* Returns the number of times the low priority queue is bypassed in favor of dequeuing from the
@@ -314,9 +322,9 @@ private:
//
// The alternative of using ResourceMutex is not appropriate as the class serves as a
// concurrency primitive and is performance sensitive.
- using QueueMutex = std::shared_mutex; // NOLINT
- using ReleaserLockGuard = std::shared_lock<QueueMutex>; // NOLINT
- using EnqueuerLockGuard = std::unique_lock<QueueMutex>; // NOLINT
+ using QueueMutex = std::shared_mutex; // NOLINT
+ using SharedLockGuard = std::shared_lock<QueueMutex>; // NOLINT
+ using UniqueLockGuard = std::unique_lock<QueueMutex>; // NOLINT
enum class QueueType : unsigned int {
kLowPriority = 0,
@@ -327,39 +335,27 @@ private:
QueueTypeSize = 3
};
- enum class AdmissionStatus {
- // The ticket request is successful, and the operation is ready to acquire a ticket.
- kReadyToAcquire,
- // There are no tickets available for the request at its given priority. The priority should
- // be increased to allow for ticket acquisition.
- kNeedsPromotion,
- // An interruption occured while attempting to acquire a ticket.
- kInterrupted,
- };
-
class Queue {
public:
Queue(PriorityTicketHolder* holder, QueueType queueType)
: _holder(holder), _queueType(queueType){};
- bool attemptToDequeue(const ReleaserLockGuard& releaserLock);
+ bool attemptToDequeue(const SharedLockGuard& sharedQueueLock);
- AdmissionStatus enqueue(OperationContext* interruptible,
- EnqueuerLockGuard& queueLock,
- const Date_t& until,
- WaitMode waitMode);
+ /**
+ * Returns true if this operation is assigned a ticket, false if the deadline is exceeded
+ * before ticket acquisition. Throws if the operation is interrupted.
+ */
+ bool enqueue(OperationContext* interruptible,
+ UniqueLockGuard& queueLock,
+ const Date_t& until,
+ WaitMode waitMode);
int queuedElems() const {
return _queuedThreads;
}
/**
- * Signals that a queued thread should be woken and exit the queue so it can be promoted to
- * a higher priority queue.
- */
- void signalPromoteSingleOp(const ReleaserLockGuard& releaserLock);
-
- /**
* Returns a reference to the Queue statistics that allows callers to update the statistics.
*/
QueueStats& getStatsToUse() {
@@ -374,11 +370,10 @@ private:
}
private:
- void _signalThreadWoken(const EnqueuerLockGuard& enqueuerLock);
+ void _signalThreadWoken(const UniqueLockGuard& uniqueQueueLock);
int _queuedThreads{0};
AtomicWord<int> _threadsToBeWoken{0};
- AtomicWord<bool> _signalPromotion{false};
stdx::condition_variable _cv;
PriorityTicketHolder* _holder;
@@ -416,13 +411,13 @@ private:
* - The number of items in each queue will not change during the execution
* - No other thread will proceed to wait during the execution of the method
*/
- void _dequeueWaitingThread(const ReleaserLockGuard& releaserLock);
+ void _dequeueWaitingThread(const SharedLockGuard& sharedQueueLock);
/**
* Returns whether there are higher priority threads pending to get a ticket in front of the
* given queue type and not enough tickets for all of them.
*/
- bool _hasToWaitForHigherPriority(const EnqueuerLockGuard& lk, QueueType queue);
+ bool _hasToWaitForHigherPriority(const UniqueLockGuard& lk, QueueType queue);
QueueType _queueType(const AdmissionContext* admCtx);
@@ -434,13 +429,25 @@ private:
QueueMutex _queueMutex;
/**
+ * Limits the number times the low priority queue is non-empty and bypassed in favor of the
+ * normal priority queue for the next ticket admission.
+ *
+ * Updates must be done under the UniqueLockGuard.
+ */
+ int _lowPriorityBypassThreshold;
+
+ /**
* Counts the number of times normal operations are dequeued over operations queued in the low
* priority queue.
*/
- AtomicWord<std::int64_t> _lowPriorityBypassCount{0};
+ AtomicWord<std::uint64_t> _lowPriorityBypassCount{0};
+
+ /**
+ * Number of times ticket admission is expedited for low priority operations.
+ */
+ AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0};
AtomicWord<int> _ticketsAvailable;
AtomicWord<int> _enqueuedElements;
- AtomicWord<std::int64_t> _promotedElements{0};
ServiceContext* _serviceContext;
};
diff --git a/src/mongo/util/concurrency/ticketholder_bm.cpp b/src/mongo/util/concurrency/ticketholder_bm.cpp
index e9185b7188f..19b75c18bc3 100644
--- a/src/mongo/util/concurrency/ticketholder_bm.cpp
+++ b/src/mongo/util/concurrency/ticketholder_bm.cpp
@@ -44,6 +44,7 @@ namespace {
static int kTickets = 128;
static int kThreadMin = 16;
static int kThreadMax = 1024;
+static int kLowPriorityAdmissionBypassThreshold = 100;
static TicketHolder::WaitMode waitMode = TicketHolder::WaitMode::kUninterruptible;
template <typename TicketHolderImpl>
@@ -55,7 +56,13 @@ public:
TicketHolderFixture(int threads, ServiceContext* serviceContext) {
- ticketHolder = std::make_unique<TicketHolderImpl>(kTickets, serviceContext);
+ if constexpr (std::is_same_v<PriorityTicketHolder, TicketHolderImpl>) {
+ ticketHolder = std::make_unique<TicketHolderImpl>(
+ kTickets, kLowPriorityAdmissionBypassThreshold, serviceContext);
+ } else {
+ ticketHolder = std::make_unique<TicketHolderImpl>(kTickets, serviceContext);
+ }
+
for (int i = 0; i < threads; ++i) {
clients.push_back(
serviceContext->makeClient(str::stream() << "test client for thread " << i));
@@ -64,6 +71,7 @@ public:
}
};
+
static Mutex isReadyMutex;
static stdx::condition_variable isReadyCv;
static bool isReady = false;
diff --git a/src/mongo/util/concurrency/ticketholder_params.idl b/src/mongo/util/concurrency/ticketholder_params.idl
deleted file mode 100644
index 770fb7d4ee3..00000000000
--- a/src/mongo/util/concurrency/ticketholder_params.idl
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright (C) 2022-present MongoDB, Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the Server Side Public License, version 1,
-# as published by MongoDB, Inc.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# Server Side Public License for more details.
-#
-# You should have received a copy of the Server Side Public License
-# along with this program. If not, see
-# <http://www.mongodb.com/licensing/server-side-public-license>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the Server Side Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-#
-
-global:
- cpp_namespace: "mongo"
-
-server_parameters:
-
- lowPriorityOperationPromotionRate:
- description: "When Deprioritization is enabled, specifies the rate at which a low priority
-operation will be promoted after being bypassed for normal priority operations n times."
- set_at: [ startup, runtime ]
- cpp_vartype: AtomicWord<int>
- cpp_varname: gLowPriorityOperationPromotionRate
- # 0 means low priority operations will never promoted.
- default: 20000
- validator:
- gte: 0
-
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp
index d47faa30093..3c7f8b1803c 100644
--- a/src/mongo/util/concurrency/ticketholder_test.cpp
+++ b/src/mongo/util/concurrency/ticketholder_test.cpp
@@ -49,6 +49,10 @@
namespace {
using namespace mongo;
+// By default, tests will create a PriorityTicketHolder where low priority admissions can be
+// bypassed an unlimited amount of times in favor of normal priority admissions.
+static constexpr int kDefaultLowPriorityAdmissionBypassThreshold = 0;
+
class TicketHolderTest : public ServiceContextTest {
void setUp() override {
ServiceContextTest::setUp();
@@ -84,12 +88,8 @@ void assertSoon(std::function<void()> predicate, Milliseconds timeout = kWaitTim
}
}
-template <class H>
-void basicTimeout(OperationContext* opCtx) {
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+void basicTimeout(OperationContext* opCtx, std::unique_ptr<TicketHolderWithQueueingStats> holder) {
auto mode = TicketHolder::WaitMode::kInterruptible;
- std::unique_ptr<TicketHolderWithQueueingStats> holder = std::make_unique<H>(1, &serviceContext);
ASSERT_EQ(holder->used(), 0);
ASSERT_EQ(holder->available(), 1);
ASSERT_EQ(holder->outof(), 1);
@@ -159,10 +159,17 @@ void basicTimeout(OperationContext* opCtx) {
}
TEST_F(TicketHolderTest, BasicTimeoutSemaphore) {
- basicTimeout<SemaphoreTicketHolder>(_opCtx.get());
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ basicTimeout(_opCtx.get(), std::make_unique<SemaphoreTicketHolder>(1, &serviceContext));
}
+
TEST_F(TicketHolderTest, BasicTimeoutPriority) {
- basicTimeout<PriorityTicketHolder>(_opCtx.get());
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ basicTimeout(_opCtx.get(),
+ std::make_unique<PriorityTicketHolder>(
+ 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext));
}
class Stats {
@@ -204,15 +211,12 @@ struct MockAdmission {
boost::optional<Ticket> ticket;
};
-template <class H>
-void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperation = false) {
- // Verify that resize operations don't alter metrics outside of those linked to the number of
- // tickets.
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
- auto mode = TicketHolder::WaitMode::kInterruptible;
- std::unique_ptr<TicketHolderWithQueueingStats> holder = std::make_unique<H>(1, &serviceContext);
+// Verify that resize operations don't alter metrics outside of those linked to the number of
+// tickets.
+void resizeTest(OperationContext* opCtx,
+ std::unique_ptr<TicketHolderWithQueueingStats> holder,
+ TickSourceMock<Microseconds>* tickSource,
+ bool testWithOutstandingImmediateOperation = false) {
Stats stats(holder.get());
// An outstanding kImmediate priority operation should not impact resize statistics.
@@ -226,6 +230,7 @@ void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperat
AdmissionContext admCtx;
admCtx.setPriority(AdmissionContext::Priority::kNormal);
+ auto mode = TicketHolder::WaitMode::kInterruptible;
auto ticket =
holder->waitForTicketUntil(opCtx, &admCtx, Date_t::now() + Milliseconds{500}, mode);
@@ -266,22 +271,53 @@ void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperat
}
TEST_F(TicketHolderTest, ResizeStatsSemaphore) {
- resizeTest<SemaphoreTicketHolder>(_opCtx.get());
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
+
+ resizeTest(
+ _opCtx.get(), std::make_unique<SemaphoreTicketHolder>(1, &serviceContext), tickSource);
}
+
TEST_F(TicketHolderTest, ResizeStatsPriority) {
- resizeTest<PriorityTicketHolder>(_opCtx.get());
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
+
+ resizeTest(_opCtx.get(),
+ std::make_unique<PriorityTicketHolder>(
+ 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext),
+ tickSource);
}
+
TEST_F(TicketHolderTest, ResizeStatsSemaphoreWithOutstandingImmediatePriority) {
- resizeTest<SemaphoreTicketHolder>(_opCtx.get(), true);
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
+
+ resizeTest(_opCtx.get(),
+ std::make_unique<SemaphoreTicketHolder>(1, &serviceContext),
+ tickSource,
+ true);
}
+
TEST_F(TicketHolderTest, ResizeStatsPriorityWithOutstandingImmediatePriority) {
- resizeTest<PriorityTicketHolder>(_opCtx.get(), true);
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
+
+ resizeTest(_opCtx.get(),
+ std::make_unique<PriorityTicketHolder>(
+ 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext),
+ tickSource,
+ true);
}
TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) {
ServiceContext serviceContext;
serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- PriorityTicketHolder holder(1, &serviceContext);
+ PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
+
Stats stats(&holder);
{
@@ -366,7 +402,7 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) {
TEST_F(TicketHolderTest, OnlyLowPriorityOps) {
ServiceContext serviceContext;
serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- PriorityTicketHolder holder(1, &serviceContext);
+ PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
Stats stats(&holder);
{
@@ -488,7 +524,7 @@ TEST_F(TicketHolderTest, OnlyLowPriorityOps) {
TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
ServiceContext serviceContext;
serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- PriorityTicketHolder holder(1, &serviceContext);
+ PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
Stats stats(&holder);
{
@@ -592,7 +628,7 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) {
ServiceContext serviceContext;
serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
- PriorityTicketHolder holder(1, &serviceContext);
+ PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
Stats stats(&holder);
MockAdmission lowPriorityAdmission(this->getServiceContext(), AdmissionContext::Priority::kLow);
@@ -697,7 +733,7 @@ TEST_F(TicketHolderTest, PrioritImmediateMetrics) {
ServiceContext serviceContext;
serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
- PriorityTicketHolder holder(1, &serviceContext);
+ PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
Stats stats(&holder);
MockAdmission lowPriorityAdmission(this->getServiceContext(), AdmissionContext::Priority::kLow);
@@ -780,7 +816,7 @@ TEST_F(TicketHolderTest, PriorityCanceled) {
ServiceContext serviceContext;
serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
- PriorityTicketHolder holder(1, &serviceContext);
+ PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
Stats stats(&holder);
{
MockAdmission lowPriorityAdmission(this->getServiceContext(),
@@ -847,13 +883,11 @@ TEST_F(TicketHolderTest, PriorityCanceled) {
ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0);
}
-TEST_F(TicketHolderTest, PriorityPromotionBasic) {
+TEST_F(TicketHolderTest, LowPriorityExpedited) {
ServiceContext serviceContext;
serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- auto lowPriorityPromotionRate = 2;
- RAIIServerParameterControllerForTest _promotionController("lowPriorityOperationPromotionRate",
- lowPriorityPromotionRate);
- PriorityTicketHolder holder(1, &serviceContext);
+ auto lowPriorityBypassThreshold = 2;
+ PriorityTicketHolder holder(1, lowPriorityBypassThreshold, &serviceContext);
Stats stats(&holder);
// Use the GlobalServiceContext to create MockAdmissions.
@@ -893,7 +927,7 @@ TEST_F(TicketHolderTest, PriorityPromotionBasic) {
initialAdmission.ticket.reset();
assertSoon([&] {
- ASSERT_EQ(holder.promoted(), 1);
+ ASSERT_EQ(holder.expedited(), 1);
ASSERT(lowPriorityAdmission.ticket);
});
@@ -919,7 +953,7 @@ TEST_F(TicketHolderTest, PriorityPromotionBasic) {
ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0);
ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1);
ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0);
- ASSERT_EQ(lowPriorityStats.getIntField("promoted"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("expedited"), 1);
auto normalPriorityStats = currentStats.getObjectField("normalPriority");
ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), queuedNormalAdmissionsCount);