summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2023-03-28 11:58:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-28 13:31:34 +0000
commit3e46a0ef81dfe95c11d9c5324fb86758b155363d (patch)
tree4765dcec524645178d080d79fca5db2ceb302a34 /src/mongo/util/concurrency
parenta6ac9a1fe19da49b7067e802bf498d7fc0dcb5af (diff)
downloadmongo-3e46a0ef81dfe95c11d9c5324fb86758b155363d.tar.gz
SERVER-74778 Refactor and improve performance of PriorityTicketHolder
Diffstat (limited to 'src/mongo/util/concurrency')
-rw-r--r--src/mongo/util/concurrency/SConscript16
-rw-r--r--src/mongo/util/concurrency/admission_context.cpp1
-rw-r--r--src/mongo/util/concurrency/admission_context.h11
-rw-r--r--src/mongo/util/concurrency/priority_ticketholder.cpp191
-rw-r--r--src/mongo/util/concurrency/priority_ticketholder.h178
-rw-r--r--src/mongo/util/concurrency/priority_ticketholder_test.cpp80
-rw-r--r--src/mongo/util/concurrency/semaphore_ticketholder.cpp4
-rw-r--r--src/mongo/util/concurrency/semaphore_ticketholder.h2
-rw-r--r--src/mongo/util/concurrency/semaphore_ticketholder_test.cpp8
-rw-r--r--src/mongo/util/concurrency/ticket_broker.h148
-rw-r--r--src/mongo/util/concurrency/ticket_pool.cpp (renamed from src/mongo/util/concurrency/ticket_broker.cpp)177
-rw-r--r--src/mongo/util/concurrency/ticket_pool.h172
-rw-r--r--src/mongo/util/concurrency/ticket_pool_test.cpp (renamed from src/mongo/util/concurrency/ticket_broker_test.cpp)58
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp39
-rw-r--r--src/mongo/util/concurrency/ticketholder.h153
-rw-r--r--src/mongo/util/concurrency/ticketholder_bm.cpp6
-rw-r--r--src/mongo/util/concurrency/ticketholder_test_fixture.cpp25
-rw-r--r--src/mongo/util/concurrency/ticketholder_test_fixture.h11
18 files changed, 603 insertions, 677 deletions
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index e0315f88c3c..3a9380483ee 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -22,22 +22,12 @@ env.Library(
],
)
-# TODO SERVER-72616: This can go away once TicketBroker is implemented in terms of atomic
-# wait/notify in C++20.
-if env.TargetOSIs('linux'):
- env.Library(
- target='ticket_broker',
- source=['ticket_broker.cpp'],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- ],
- )
-
env.Library(
target='ticketholder',
source=[
'priority_ticketholder.cpp' if env.TargetOSIs('linux') else [],
'semaphore_ticketholder.cpp',
+ 'ticket_pool.cpp' if env.TargetOSIs('linux') else [],
'ticketholder.cpp',
],
LIBDEPS_PRIVATE=[
@@ -47,7 +37,6 @@ env.Library(
'$BUILD_DIR/mongo/db/storage/storage_engine_feature_flags',
'$BUILD_DIR/third_party/shim_boost',
'admission_context',
- 'ticket_broker' if env.TargetOSIs('linux') else []
],
)
@@ -72,7 +61,7 @@ env.CppUnitTest(
'spin_lock_test.cpp',
'thread_pool_test.cpp',
'ticketholder_test_fixture.cpp',
- 'ticket_broker_test.cpp' if env.TargetOSIs('linux') else [],
+ 'ticket_pool_test.cpp' if env.TargetOSIs('linux') else [],
'with_lock_test.cpp',
],
LIBDEPS=[
@@ -81,7 +70,6 @@ env.CppUnitTest(
'spin_lock',
'thread_pool',
'thread_pool_test_fixture',
- 'ticket_broker' if env.TargetOSIs('linux') else [],
'ticketholder',
],
)
diff --git a/src/mongo/util/concurrency/admission_context.cpp b/src/mongo/util/concurrency/admission_context.cpp
index 4873133cc71..5c8f8f28508 100644
--- a/src/mongo/util/concurrency/admission_context.cpp
+++ b/src/mongo/util/concurrency/admission_context.cpp
@@ -28,6 +28,7 @@
*/
#include "mongo/util/concurrency/admission_context.h"
+#include "mongo/util/assert_util.h"
namespace mongo {
diff --git a/src/mongo/util/concurrency/admission_context.h b/src/mongo/util/concurrency/admission_context.h
index 7ac97176555..0e52a5860e3 100644
--- a/src/mongo/util/concurrency/admission_context.h
+++ b/src/mongo/util/concurrency/admission_context.h
@@ -28,7 +28,7 @@
*/
#pragma once
-#include "mongo/db/concurrency/lock_manager_defs.h"
+#include "mongo/base/string_data.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/util/tick_source.h"
@@ -84,14 +84,6 @@ public:
return admissions.loadRelaxed();
}
- void setLockMode(LockMode lockMode) {
- _lockMode.store(lockMode);
- }
-
- LockMode getLockMode() const {
- return _lockMode.loadRelaxed();
- }
-
void setPriority(Priority priority) {
_priority.store(priority);
}
@@ -108,7 +100,6 @@ private:
// semantics.
AtomicWord<TickSource::Tick> _startProcessingTime{0};
AtomicWord<int32_t> admissions{0};
- AtomicWord<LockMode> _lockMode{LockMode::MODE_NONE};
AtomicWord<Priority> _priority{Priority::kNormal};
};
diff --git a/src/mongo/util/concurrency/priority_ticketholder.cpp b/src/mongo/util/concurrency/priority_ticketholder.cpp
index cb3e6a3bb7c..aa8e581fcf2 100644
--- a/src/mongo/util/concurrency/priority_ticketholder.cpp
+++ b/src/mongo/util/concurrency/priority_ticketholder.cpp
@@ -41,61 +41,50 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
namespace mongo {
+
PriorityTicketHolder::PriorityTicketHolder(int32_t numTickets,
int32_t lowPriorityBypassThreshold,
ServiceContext* serviceContext)
- : TicketHolderWithQueueingStats(numTickets, serviceContext),
- _lowPriorityBypassThreshold(lowPriorityBypassThreshold),
- _ticketsAvailable(numTickets),
- _serviceContext(serviceContext) {}
+ : TicketHolder(numTickets, serviceContext), _serviceContext(serviceContext) {
+
+ auto queue = std::make_unique<SimplePriorityTicketQueue>(lowPriorityBypassThreshold);
+ _pool = std::make_unique<TicketPool>(numTickets, std::move(queue));
+}
+
+int32_t PriorityTicketHolder::available() const {
+ return _pool->available();
+}
+
+int64_t PriorityTicketHolder::queued() const {
+ return _pool->queued();
+}
int64_t PriorityTicketHolder::numFinishedProcessing() const {
return _stats[_enumToInt(QueueType::kLowPriority)].totalFinishedProcessing.load() +
_stats[_enumToInt(QueueType::kNormalPriority)].totalFinishedProcessing.load();
}
-void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold(
- const int32_t& newBypassThreshold) {
- stdx::unique_lock<stdx::mutex> growthLock(_growthMutex);
- _lowPriorityBypassThreshold = newBypassThreshold;
+int64_t PriorityTicketHolder::expedited() const {
+ return static_cast<SimplePriorityTicketQueue*>(_pool->getQueue())->expedited();
+}
+
+int64_t PriorityTicketHolder::bypassed() const {
+ return static_cast<SimplePriorityTicketQueue*>(_pool->getQueue())->bypassed();
+}
+
+void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) {
+ auto queue = static_cast<SimplePriorityTicketQueue*>(_pool->getQueue());
+ queue->updateLowPriorityAdmissionBypassThreshold(newBypassThreshold);
}
boost::optional<Ticket> PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
invariant(admCtx);
- // Handed over tickets to queued waiters do not affect this path since they are not accounted
- // for in the general ticketsAvailable counter.
- auto hasAcquired = _tryAcquireTicket();
- if (hasAcquired) {
- return Ticket{this, admCtx};
+ if (_pool->tryAcquire()) {
+ return Ticket(this, admCtx);
}
- return boost::none;
-}
-TicketBroker::WaitingResult PriorityTicketHolder::_attemptToAcquireTicket(
- TicketBroker& ticketBroker, Date_t deadline, Milliseconds maxWaitTime) {
- // We are going to enter the broker as a waiter, so we must block releasers momentarily before
- // registering ourselves as a waiter. Otherwise we risk missing a ticket.
- stdx::unique_lock growthLock(_growthMutex);
- // Check if a ticket became present in the general pool. This prevents a potential
- // deadlock if the following were to happen without a tryAcquire:
- // * Thread A proceeds to wait for a ticket to be handed over but before it acquires the
- // growthLock gets descheduled.
- // * Thread B releases a ticket, sees no waiters and releases to the general pool.
- // * Thread A acquires the lock and proceeds to wait.
- //
- // In this scenario Thread A would spin indefinitely since it never picks up that there is a
- // ticket in the general pool. It would wait until another thread comes in and hands over a
- // ticket.
- if (_tryAcquireTicket()) {
- TicketBroker::WaitingResult result;
- result.hasTimedOut = false;
- result.hasTicket = true;
- return result;
- }
- // We wait for a tiny bit before checking for interruption.
- auto maxUntil = std::min(deadline, Date_t::now() + maxWaitTime);
- return ticketBroker.attemptWaitForTicketUntil(std::move(growthLock), maxUntil);
+ return boost::none;
}
boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
@@ -103,79 +92,60 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC
Date_t until) {
invariant(admCtx);
- auto queueType = _getQueueType(admCtx);
- auto& ticketBroker = _getBroker(queueType);
-
while (true) {
- // We attempt to acquire a ticket for a period of time. This may or may not succeed, in
- // which case we will retry until timing out or getting interrupted.
- auto waitingResult = _attemptToAcquireTicket(ticketBroker, until, Milliseconds{500});
+ // To support interruptibility of ticket acquisition, we attempt to acquire a ticket for a
+ // maximum period of time. This may or may not succeed, in which case we will retry until
+ // timing out or getting interrupted.
+ auto maxUntil = std::min(until, Date_t::now() + Milliseconds(500));
+ auto acquired = _pool->acquire(admCtx, maxUntil);
ScopeGuard rereleaseIfTimedOutOrInterrupted([&] {
// We may have gotten a ticket that we can't use, release it back to the ticket pool.
- if (waitingResult.hasTicket) {
- _releaseToTicketPoolImpl(admCtx);
+ if (acquired) {
+ _pool->release();
}
});
+
if (opCtx) {
opCtx->checkForInterrupt();
}
- auto hasTimedOut = waitingResult.hasTimedOut;
- if (hasTimedOut && Date_t::now() > until) {
+
+ if (acquired) {
+ rereleaseIfTimedOutOrInterrupted.dismiss();
+ return Ticket(this, admCtx);
+ } else if (maxUntil == until) {
+ // We hit the end of our deadline, so return nothing.
return boost::none;
}
- // We haven't been interrupted or timed out, so we may have a valid ticket present.
- rereleaseIfTimedOutOrInterrupted.dismiss();
- if (waitingResult.hasTicket) {
- return Ticket{this, admCtx};
- }
+ // We hit the periodic deadline, but are still within the caller's deadline, so retry.
}
+
+ return boost::none;
}
void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
// 'Immediate' priority operations should bypass the ticketing system completely.
invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate);
-
- // We will now proceed to perform dequeueing, we must acquire the growth mutex in order to
- // prevent new enqueuers.
- stdx::unique_lock growthLock(_growthMutex);
-
- auto hasWokenThread = _dequeueWaitingThread(growthLock);
- if (!hasWokenThread) {
- // There's no-one in the queue left to wake, so we give the ticket back for general
- // availability.
- _ticketsAvailable.addAndFetch(1);
- }
+ _pool->release();
}
void PriorityTicketHolder::_resize(int32_t newSize, int32_t oldSize) noexcept {
auto difference = newSize - oldSize;
if (difference > 0) {
- // As we're adding tickets the waiting threads need to be notified that there are new
- // tickets available.
- stdx::unique_lock dequeuerLock(_growthMutex);
- for (int32_t i = 0; i < difference; i++) {
- auto hasWokenThread = _dequeueWaitingThread(dequeuerLock);
- if (!hasWokenThread) {
- // There's no-one in the brokers left to wake, so we give the ticket back for
- // general availability.
- _ticketsAvailable.addAndFetch(1);
- }
+ // Hand out tickets one-by-one until we've given them all out.
+ for (auto remaining = difference; remaining > 0; remaining--) {
+ _pool->release();
}
} else {
AdmissionContext admCtx;
- for (int32_t i = 0; i < std::abs(difference); i++) {
- // This operation is uninterruptible as the resize operation is conceptually atomic.
- // Cancelling the resize and leaving it in-between the old size and the new one is not
- // allowed.
- auto ticket = _waitForTicketUntilImpl(nullptr, &admCtx, Date_t::max());
- invariant(ticket);
- ticket->discard();
+ // Take tickets one-by-one without releasing.
+ for (auto remaining = -difference; remaining > 0; remaining--) {
+ _pool->acquire(&admCtx, Date_t::max());
}
}
}
-TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse(
+TicketHolder::QueueStats& PriorityTicketHolder::_getQueueStatsToUse(
const AdmissionContext* admCtx) noexcept {
auto queueType = _getQueueType(admCtx);
return _stats[_enumToInt(queueType)];
@@ -196,61 +166,6 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
_appendCommonQueueImplStats(bbb, normalPriorityTicketStats);
bbb.done();
}
- b.append("immediatePriorityAdmissionsCount", getImmediatePriorityAdmissionsCount());
-}
-
-bool PriorityTicketHolder::_tryAcquireTicket() {
- // Test, then test and set to avoid invalidating a cache line unncessarily.
- if (_ticketsAvailable.load() <= 0) {
- return false;
- }
- auto remaining = _ticketsAvailable.subtractAndFetch(1);
- if (remaining < 0) {
- _ticketsAvailable.addAndFetch(1);
- return false;
- }
- return true;
-}
-
-bool PriorityTicketHolder::_dequeueWaitingThread(const stdx::unique_lock<stdx::mutex>& growthLock) {
- // There are only 2 possible brokers to transfer our ticket to - the low priority and normal
- // priority brokers. There will never be anything to transfer to the immediate priority broker,
- // given immediate priority operations will never wait for ticket admission.
- auto& lowPriorityBroker = _getBroker(QueueType::kLowPriority);
- auto& normalPriorityBroker = _getBroker(QueueType::kNormalPriority);
-
- // There is a guarantee that the number of waiters will not increase while holding the growth
- // lock. This check is safe as long as we only compare it against an upper bound.
- auto lowPrioWaiting = lowPriorityBroker.waitingThreads(growthLock);
- auto normalPrioWaiting = normalPriorityBroker.waitingThreads(growthLock);
-
- if (lowPrioWaiting == 0 && normalPrioWaiting == 0) {
- return false;
- }
- if (lowPrioWaiting == 0) {
- return normalPriorityBroker.attemptToTransferTicket(growthLock);
- }
- if (normalPrioWaiting == 0) {
- return lowPriorityBroker.attemptToTransferTicket(growthLock);
- }
-
- // Both brokers are non-empty, and the low priority broker is bypassed for release in favor of
- // the normal priority broker until the bypass threshold is met.
- if (_lowPriorityBypassThreshold > 0 &&
- _lowPriorityBypassCount.addAndFetch(1) % _lowPriorityBypassThreshold == 0) {
- if (lowPriorityBroker.attemptToTransferTicket(growthLock)) {
- _expeditedLowPriorityAdmissions.addAndFetch(1);
- return true;
- } else {
- return normalPriorityBroker.attemptToTransferTicket(growthLock);
- }
- }
-
- if (!normalPriorityBroker.attemptToTransferTicket(growthLock)) {
- return lowPriorityBroker.attemptToTransferTicket(growthLock);
- } else {
- return true;
- }
}
} // namespace mongo
diff --git a/src/mongo/util/concurrency/priority_ticketholder.h b/src/mongo/util/concurrency/priority_ticketholder.h
index a46d1bca0b2..54c07e640b1 100644
--- a/src/mongo/util/concurrency/priority_ticketholder.h
+++ b/src/mongo/util/concurrency/priority_ticketholder.h
@@ -36,42 +36,62 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/mutex.h"
-#include "mongo/util/concurrency/ticket_broker.h"
+#include "mongo/util/concurrency/ticket_pool.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/time_support.h"
namespace mongo {
+namespace {
+enum class QueueType : unsigned int { kLowPriority = 0, kNormalPriority = 1, NumQueues = 2 };
+
+} // namespace
+
class Ticket;
+
/**
- * A ticketholder implementation that centralises all ticket acquisition/releases. Waiters will get
- * placed in a specific internal queue according to some logic. Releasers will wake up a waiter
- * from a group chosen according to some logic.
- *
- * MODIFICATIONS TO THIS CLASS MUST BE ACCOMPANIED BY AN UPDATE OF
- * src/mongo/tla_plus/PriorityTicketHolder/MCPriorityTicketHolder.tla TO ENSURE IT IS CORRECT
+ * This SimplePriorityTicketQueue implements a queue policy that separates normal and low priority
+ * operations into separate queues. Normal priority operations are always scheduled ahead of low
+ * priority ones, except when a positive lowPriorityBypassThreshold is provided. This parameter
+ * specifies how often a waiting low-priority operation should skip the queue and be scheduled ahead
+ * of waiting normal priority operations.
*/
-class PriorityTicketHolder : public TicketHolderWithQueueingStats {
+class SimplePriorityTicketQueue : public TicketQueue {
public:
- explicit PriorityTicketHolder(int32_t numTickets,
- int32_t lowPriorityBypassThreshold,
- ServiceContext* serviceContext);
- ~PriorityTicketHolder() override{};
+ SimplePriorityTicketQueue(int lowPriorityBypassThreshold)
+ : _lowPriorityBypassThreshold(lowPriorityBypassThreshold) {}
- int32_t available() const override final {
- return _ticketsAvailable.load();
- };
+ bool empty() const final {
+ return _normal.empty() && _low.empty();
+ }
- int64_t queued() const override final {
- int64_t result = 0;
- for (const auto& queue : _brokers) {
- result += queue.waitingThreadsRelaxed();
+ void push(std::shared_ptr<TicketWaiter> val) final {
+ if (val->context->getPriority() == AdmissionContext::Priority::kLow) {
+ _low.push(std::move(val));
+ return;
}
- return result;
+ invariant(val->context->getPriority() == AdmissionContext::Priority::kNormal);
+ _normal.push(std::move(val));
}
- int64_t numFinishedProcessing() const override final;
+ std::shared_ptr<TicketWaiter> pop() final {
+ if (!_normal.empty() && !_low.empty() && _lowPriorityBypassThreshold.load() > 0 &&
+ _lowPriorityBypassCount.fetchAndAdd(1) % _lowPriorityBypassThreshold.load() == 0) {
+ auto front = std::move(_low.front());
+ _low.pop();
+ _expeditedLowPriorityAdmissions.addAndFetch(1);
+ return front;
+ }
+ if (!_normal.empty()) {
+ auto front = std::move(_normal.front());
+ _normal.pop();
+ return front;
+ }
+ auto front = std::move(_low.front());
+ _low.pop();
+ return front;
+ }
/**
* Number of times low priority operations are expedited for ticket admission over normal
@@ -87,13 +107,61 @@ public:
*/
std::int64_t bypassed() const {
return _lowPriorityBypassCount.loadRelaxed();
- };
+ }
- void updateLowPriorityAdmissionBypassThreshold(const int32_t& newBypassThreshold);
+ void updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) {
+ _lowPriorityBypassThreshold.store(newBypassThreshold);
+ }
private:
- enum class QueueType : unsigned int { kLowPriority = 0, kNormalPriority = 1, NumQueues = 2 };
+ /**
+ * Limits the number times the low priority queue is non-empty and bypassed in favor of the
+ * normal priority queue for the next ticket admission.
+ */
+ AtomicWord<std::int32_t> _lowPriorityBypassThreshold;
+
+ /**
+ * Number of times ticket admission is expedited for low priority operations.
+ */
+ AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0};
+
+ /**
+ * Counts the number of times normal operations are dequeued over operations queued in the low
+ * priority queue. We explicitly use an unsigned type here because rollover is desired.
+ */
+ AtomicWord<std::uint64_t> _lowPriorityBypassCount{0};
+
+ std::queue<std::shared_ptr<TicketWaiter>> _normal;
+ std::queue<std::shared_ptr<TicketWaiter>> _low;
+};
+
+/**
+ * A PriorityTicketHolder supports queueing and prioritization of operations based on
+ * AdmissionContext::Priority.
+ *
+ * MODIFICATIONS TO THIS CLASS MUST BE ACCOMPANIED BY AN UPDATE OF
+ * src/mongo/tla_plus/PriorityTicketHolder/MCPriorityTicketHolder.tla TO ENSURE IT IS CORRECT
+ */
+class PriorityTicketHolder : public TicketHolder {
+public:
+ explicit PriorityTicketHolder(int32_t numTickets,
+ int32_t lowPriorityBypassThreshold,
+ ServiceContext* serviceContext);
+ ~PriorityTicketHolder() override{};
+
+ int32_t available() const override final;
+
+ int64_t queued() const override final;
+
+ int64_t numFinishedProcessing() const override final;
+
+ std::int64_t expedited() const;
+ std::int64_t bypassed() const;
+
+ void updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold);
+
+private:
boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
@@ -108,39 +176,6 @@ private:
void _appendImplStats(BSONObjBuilder& b) const override final;
- bool _tryAcquireTicket();
-
- TicketBroker::WaitingResult _attemptToAcquireTicket(TicketBroker& ticketBroker,
- Date_t deadline,
- Milliseconds maxWaitTime);
-
- /**
- * Wakes up a waiting thread (if it exists) and hands-over the current ticket.
- * Implementors MUST wake at least one waiting thread if at least one thread is waiting in any
- * of the brokers. In other words, attemptToTransferTicket on each non-empty TicketBroker must
- * be called until either it returns true at least once or has been called on all brokers.
- *
- * Care must be taken to ensure that only CPU-bound work is performed here and it doesn't block.
- * We risk stalling all other operations otherwise.
- *
- * When called the following invariants will be held:
- * - Successive checks to the number of waiting threads in a TicketBroker will always be <= the
- * previous value. That is, no new waiters can come in.
- * - Calling TicketBroker::attemptToTransferTicket will always return false if it has previously
- * returned false. Successive calls can change the result from true to false, but never the
- * reverse.
- */
- bool _dequeueWaitingThread(const stdx::unique_lock<stdx::mutex>& growthLock);
-
- static unsigned int _enumToInt(QueueType queueType) {
- return static_cast<unsigned int>(queueType);
- }
-
- TicketBroker& _getBroker(QueueType queueType) {
- return _brokers[_enumToInt(queueType)];
- }
-
-
static QueueType _getQueueType(const AdmissionContext* admCtx) {
auto priority = admCtx->getPriority();
switch (priority) {
@@ -153,35 +188,14 @@ private:
}
}
- // This mutex is meant to be used in order to grow the queue or to prevent it from doing so.
- //
- // We use an stdx::mutex here because we want to minimize overhead as much as possible. Using a
- // normal mongo::Mutex would add some unnecessary metrics counters. Additionally we need this
- // type as it is part of the TicketBroker API in order to avoid misuse.
- stdx::mutex _growthMutex; // NOLINT
+ static unsigned int _enumToInt(QueueType queueType) {
+ return static_cast<std::underlying_type_t<QueueType>>(queueType);
+ }
- std::array<TicketBroker, static_cast<unsigned int>(QueueType::NumQueues)> _brokers;
std::array<QueueStats, static_cast<unsigned int>(QueueType::NumQueues)> _stats;
- /**
- * Limits the number times the low priority queue is non-empty and bypassed in favor of the
- * normal priority queue for the next ticket admission.
- *
- * Updates must be done under the _growthMutex.
- */
- int32_t _lowPriorityBypassThreshold;
-
- /**
- * Counts the number of times normal operations are dequeued over operations queued in the low
- * priority queue. We explicitly use an unsigned type here because rollover is desired.
- */
- AtomicWord<std::uint64_t> _lowPriorityBypassCount{0};
+ std::unique_ptr<TicketPool> _pool;
- /**
- * Number of times ticket admission is expedited for low priority operations.
- */
- AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0};
- AtomicWord<int32_t> _ticketsAvailable;
ServiceContext* _serviceContext;
};
} // namespace mongo
diff --git a/src/mongo/util/concurrency/priority_ticketholder_test.cpp b/src/mongo/util/concurrency/priority_ticketholder_test.cpp
index e5cfa66891c..fff70a982ec 100644
--- a/src/mongo/util/concurrency/priority_ticketholder_test.cpp
+++ b/src/mongo/util/concurrency/priority_ticketholder_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/priority_ticketholder.h"
#include "mongo/util/concurrency/ticketholder_test_fixture.h"
+#include "mongo/util/periodic_runner_factory.h"
#include "mongo/util/tick_source_mock.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
@@ -75,31 +76,42 @@ void assertSoon(std::function<bool()> predicate, Milliseconds timeout = kWaitTim
}
}
-class PriorityTicketHolderTest : public TicketHolderTestFixture {};
+class PriorityTicketHolderTest : public TicketHolderTestFixture {
+public:
+ void setUp() override {
+ TicketHolderTestFixture::setUp();
+
+ auto tickSource = std::make_unique<TickSourceMock<Microseconds>>();
+ _tickSource = tickSource.get();
+ getServiceContext()->setTickSource(std::move(tickSource));
+ }
+
+ TickSourceMock<Microseconds>* getTickSource() {
+ return _tickSource;
+ }
+
+private:
+ TickSourceMock<Microseconds>* _tickSource;
+};
TEST_F(PriorityTicketHolderTest, BasicTimeoutPriority) {
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
basicTimeout(_opCtx.get(),
- std::make_unique<PriorityTicketHolder>(
- 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext));
+ std::make_unique<PriorityTicketHolder>(1 /* tickets */,
+ kDefaultLowPriorityAdmissionBypassThreshold,
+ getServiceContext()));
}
TEST_F(PriorityTicketHolderTest, ResizeStatsPriority) {
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
-
resizeTest(_opCtx.get(),
- std::make_unique<PriorityTicketHolder>(
- 1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext),
- tickSource);
+ std::make_unique<PriorityTicketHolder>(1 /* tickets */,
+ kDefaultLowPriorityAdmissionBypassThreshold,
+ getServiceContext()),
+ getTickSource());
}
TEST_F(PriorityTicketHolderTest, PriorityTwoQueuedOperations) {
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
+ PriorityTicketHolder holder(
+ 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext());
Stats stats(&holder);
@@ -170,9 +182,8 @@ TEST_F(PriorityTicketHolderTest, PriorityTwoQueuedOperations) {
TEST_F(PriorityTicketHolderTest, OnlyLowPriorityOps) {
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
+ PriorityTicketHolder holder(
+ 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext());
Stats stats(&holder);
// This mutex is to avoid data race conditions between checking for the ticket state and setting
@@ -289,9 +300,8 @@ TEST_F(PriorityTicketHolderTest, OnlyLowPriorityOps) {
}
TEST_F(PriorityTicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
+ PriorityTicketHolder holder(
+ 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext());
Stats stats(&holder);
{
@@ -377,10 +387,8 @@ TEST_F(PriorityTicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
}
TEST_F(PriorityTicketHolderTest, PriorityBasicMetrics) {
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
- PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
+ PriorityTicketHolder holder(
+ 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext());
Stats stats(&holder);
MockAdmission lowPriorityAdmission(this->getServiceContext(), AdmissionContext::Priority::kLow);
@@ -416,7 +424,7 @@ TEST_F(PriorityTicketHolderTest, PriorityBasicMetrics) {
});
}
- tickSource->advance(Microseconds(100));
+ getTickSource()->advance(Microseconds(100));
lowPriorityAdmission.ticket.reset();
while (holder.queued() > 0) {
@@ -424,7 +432,7 @@ TEST_F(PriorityTicketHolderTest, PriorityBasicMetrics) {
}
barrierAcquiredTicket.countDownAndWait();
- tickSource->advance(Microseconds(200));
+ getTickSource()->advance(Microseconds(200));
barrierReleaseTicket.countDownAndWait();
waiting.join();
@@ -473,10 +481,8 @@ TEST_F(PriorityTicketHolderTest, PriorityBasicMetrics) {
}
TEST_F(PriorityTicketHolderTest, PriorityCanceled) {
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
- auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
- PriorityTicketHolder holder(1, kDefaultLowPriorityAdmissionBypassThreshold, &serviceContext);
+ PriorityTicketHolder holder(
+ 1 /* tickets */, kDefaultLowPriorityAdmissionBypassThreshold, getServiceContext());
Stats stats(&holder);
{
MockAdmission lowPriorityAdmission(this->getServiceContext(),
@@ -497,7 +503,7 @@ TEST_F(PriorityTicketHolderTest, PriorityCanceled) {
// Wait for thread to take ticket.
}
- tickSource->advance(Microseconds(100));
+ getTickSource()->advance(Microseconds(100));
waiting.join();
}
@@ -532,10 +538,8 @@ TEST_F(PriorityTicketHolderTest, PriorityCanceled) {
}
TEST_F(PriorityTicketHolderTest, LowPriorityExpedited) {
- ServiceContext serviceContext;
- serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
auto lowPriorityBypassThreshold = 2;
- PriorityTicketHolder holder(1, lowPriorityBypassThreshold, &serviceContext);
+ PriorityTicketHolder holder(1 /* tickets */, lowPriorityBypassThreshold, getServiceContext());
Stats stats(&holder);
// Use the GlobalServiceContext to create MockAdmissions.
@@ -617,4 +621,10 @@ TEST_F(PriorityTicketHolderTest, LowPriorityExpedited) {
ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), queuedNormalAdmissionsCount + 1);
ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0);
}
+
+TEST_F(PriorityTicketHolderTest, Interruption) {
+ interruptTest(_opCtx.get(),
+ std::make_unique<PriorityTicketHolder>(1 /* tickets */, 0, getServiceContext()));
+}
+
} // namespace
diff --git a/src/mongo/util/concurrency/semaphore_ticketholder.cpp b/src/mongo/util/concurrency/semaphore_ticketholder.cpp
index 1099b9d7cb6..31969a6ad11 100644
--- a/src/mongo/util/concurrency/semaphore_ticketholder.cpp
+++ b/src/mongo/util/concurrency/semaphore_ticketholder.cpp
@@ -82,7 +82,7 @@ void tsFromDate(const Date_t& deadline, struct timespec& ts) {
} // namespace
SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext)
- : TicketHolderWithQueueingStats(numTickets, serviceContext) {
+ : TicketHolder(numTickets, serviceContext) {
check(sem_init(&_sem, 0, numTickets));
}
@@ -160,7 +160,7 @@ void SemaphoreTicketHolder::_resize(int32_t newSize, int32_t oldSize) noexcept {
#else
SemaphoreTicketHolder::SemaphoreTicketHolder(int32_t numTickets, ServiceContext* svcCtx)
- : TicketHolderWithQueueingStats(numTickets, svcCtx), _numTickets(numTickets) {}
+ : TicketHolder(numTickets, svcCtx), _numTickets(numTickets) {}
SemaphoreTicketHolder::~SemaphoreTicketHolder() = default;
diff --git a/src/mongo/util/concurrency/semaphore_ticketholder.h b/src/mongo/util/concurrency/semaphore_ticketholder.h
index d2e6ef1f257..45e5d80d7cc 100644
--- a/src/mongo/util/concurrency/semaphore_ticketholder.h
+++ b/src/mongo/util/concurrency/semaphore_ticketholder.h
@@ -46,7 +46,7 @@
namespace mongo {
-class SemaphoreTicketHolder final : public TicketHolderWithQueueingStats {
+class SemaphoreTicketHolder final : public TicketHolder {
public:
explicit SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext);
~SemaphoreTicketHolder() override final;
diff --git a/src/mongo/util/concurrency/semaphore_ticketholder_test.cpp b/src/mongo/util/concurrency/semaphore_ticketholder_test.cpp
index 241e6f622e8..d8cc00f4ef5 100644
--- a/src/mongo/util/concurrency/semaphore_ticketholder_test.cpp
+++ b/src/mongo/util/concurrency/semaphore_ticketholder_test.cpp
@@ -50,4 +50,12 @@ TEST_F(SemaphoreTicketHolderTest, ResizeStatsSemaphore) {
resizeTest(
_opCtx.get(), std::make_unique<SemaphoreTicketHolder>(1, &serviceContext), tickSource);
}
+
+TEST_F(SemaphoreTicketHolderTest, Interruption) {
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ interruptTest(_opCtx.get(),
+ std::make_unique<SemaphoreTicketHolder>(1 /* tickets */, getServiceContext()));
+}
+
} // namespace
diff --git a/src/mongo/util/concurrency/ticket_broker.h b/src/mongo/util/concurrency/ticket_broker.h
deleted file mode 100644
index fbff94d06e2..00000000000
--- a/src/mongo/util/concurrency/ticket_broker.h
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-#pragma once
-
-#include "mongo/platform/atomic_word.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-/**
- * A ticket broker between threads waiting (Ticket-waiters) and threads willing to give a ticket
- * (Ticket-releasers).
- *
- * This broker requires external synchronisation (growthLock) so that it can function correctly. The
- * methods that require synchronisation have their signature written so that it enforces correct
- * usage. Using it with a different mutex in two separate calls is undefined behaviour and will
- * almost certainly cause a deadlock or segmentation fault.
- *
- * This class is to be used when more than one broker is necessary in a given scope and certain
- * guarantees have to be made. Given the following conditions for usage:
- *
- * - Ticket-waiters must acquire the growthLock in order to enter the broker.
- * - Ticket-releasers must acquire the growthLock in order to transfer their ticket to a waiter in
- * the broker.
- *
- * Then this class can provide the following guarantees across the multiple instances of it in
- * scope:
- *
- * - Ticket-releasers can attempt to transfer their ticket to each broker only once. No other waiter
- * will appear between attempts.
- * - Ticket-waiters will get scheduled for execution once transferred a ticket without having to
- * acquire any mutex.
- *
- * This is useful for example if you need to build a scheduler based on top of multiple brokers
- * representing different groups of operations. The ticket-releasers can have "snapshot" guarantees
- * of the state of the system and choose who to transfer the ticket to based on some arbitrary
- * logic.
- *
- * Note that the implementation allows granular thread selection for wakeup but we've chosen to use
- * FIFO semantics to make the critical section as short lived as possible.
- */
-class TicketBroker {
-public:
- TicketBroker();
-
- struct WaitingResult {
- bool hasTimedOut;
- bool hasTicket;
- };
-
- /**
- * Attempts to wait for a ticket until it reaches the specified timeout. The return type will
- * contain whether the attempt was successful and/or if it timed out.
- *
- * This method consumes the lock since it will internally unlock it. Only locking again if the
- * call times out in order to remove the thread from the waiting list.
- */
- WaitingResult attemptWaitForTicketUntil(stdx::unique_lock<stdx::mutex> growthLock,
- Date_t until) noexcept;
-
- /**
- * Transfers the ticket if there is a thread to transfer it to. Returns true if the ticket was
- * transferred.
- *
- * Guarantee: No modifications to the internal linked list will take place while holding the
- * lock.
- */
- bool attemptToTransferTicket(const stdx::unique_lock<stdx::mutex>& growthLock) noexcept;
-
- /**
- * Returns the number of threads waiting.
- *
- *
- * This value will be consistent while holding the growthLock.
- */
- int32_t waitingThreads(const stdx::unique_lock<stdx::mutex>& growthLock) const noexcept {
- return _numQueued.load();
- }
-
- /**
- * Same as 'waitingThreads()' but without strict ordering and consistency guarantees.
- *
- * This method is meant for monitoring and tests only. The value is a snapshot of the system at
- * the moment of calling the method. It will potentially be out of date as soon as it returns.
- */
- int32_t waitingThreadsRelaxed() const noexcept {
- return _numQueued.loadRelaxed();
- }
-
-private:
- /**
- * Node structure of the linked list, it has to be a doubly linked list in order to allow
- * random removal of nodes. Lifetime of these nodes will reside in the stack memory of the
- * thread waiting.
- */
- struct Node {
- Node* previous{nullptr};
- AtomicWord<uint32_t> futexWord{0};
- Node* next{nullptr};
- };
-
- // Append the node to the linked list.
- void _registerAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, Node& node) noexcept;
-
- // Removes the node from the linked list.
- void _unregisterAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, Node& node) noexcept;
-
- /**
- * Edge nodes of the linked list. To append we need to know the end of the list in order to make
- * appends O(1) instead of O(n).
- */
- Node* _queueBegin;
- Node* _queueEnd;
-
- /**
- * Number of queued threads in the linked list.
- */
- AtomicWord<int32_t> _numQueued;
-};
-
-} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticket_broker.cpp b/src/mongo/util/concurrency/ticket_pool.cpp
index 1e9c4ba403c..dfda5ebcf00 100644
--- a/src/mongo/util/concurrency/ticket_broker.cpp
+++ b/src/mongo/util/concurrency/ticket_pool.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2022-present MongoDB, Inc.
+ * Copyright (C) 2023-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -27,18 +27,21 @@
* it in the license file.
*/
-#include "mongo/util/concurrency/ticket_broker.h"
-#include "mongo/logv2/log.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/util/errno_util.h"
-
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+#include "ticket_pool.h"
+
// TODO SERVER-72616: Remove futex usage from this class in favour of atomic waits.
#include <linux/futex.h> /* Definition of FUTEX_* constants */
#include <sys/syscall.h> /* Definition of SYS_* constants */
#include <unistd.h>
+#include "mongo/logv2/log.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/errno_util.h"
+
namespace mongo {
namespace {
static int futex(uint32_t* uaddr,
@@ -108,115 +111,87 @@ static void atomic_notify_one(AtomicWord<uint32_t>& atomic) noexcept {
}
} // namespace
-TicketBroker::TicketBroker() : _queueBegin(nullptr), _queueEnd(nullptr), _numQueued(0) {}
-
-void TicketBroker::_registerAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock,
- Node& node) noexcept {
- // We register the node.
- _numQueued.fetchAndAdd(1);
-
- if (_queueBegin == nullptr) {
- // If the list is empty we are the first node.
- _queueBegin = &node;
- _queueEnd = &node;
- } else {
- // Otherwise we're the new end and must link the preceding node to us, and us to the
- // preceding node.
- _queueEnd->next = &node;
- node.previous = _queueEnd;
- _queueEnd = &node;
+TicketPool::TicketPool(int numTickets, std::unique_ptr<TicketQueue> queue)
+ : _available(numTickets), _queued(0), _waiters(std::move(queue)) {}
+
+bool TicketPool::tryAcquire() {
+ auto available = _available.load();
+ bool gotTicket = false;
+ while (available > 0 && !gotTicket) {
+ gotTicket = _available.compareAndSwap(&available, available - 1);
}
+ return gotTicket;
}
-void TicketBroker::_unregisterAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock,
- Node& node) noexcept {
- // We've been unregistered by a ticket transfer, nothing to do as the transferer already removed
- // us.
- if (node.futexWord.load() != 0) {
- return;
- }
+bool TicketPool::acquire(AdmissionContext* admCtx, Date_t deadline) {
+ auto waiter = std::make_shared<TicketWaiter>();
+ waiter->context = admCtx;
- auto previousLength = _numQueued.fetchAndSubtract(1);
- // If there was only 1 node it was us, the queue will now be empty.
- if (previousLength == 1) {
- _queueBegin = _queueEnd = nullptr;
- return;
- }
- // If the beginning of the linked list is this node we advance it to the next element.
- if (_queueBegin == &node) {
- _queueBegin = node.next;
- node.next->previous = nullptr;
- return;
+ {
+ stdx::unique_lock<Mutex> lk(_mutex);
+ // It is important to check for a ticket one more time before queueing, as a ticket may have
+ // just become available.
+ if (tryAcquire()) {
+ return true;
+ }
+ _queued.addAndFetch(1);
+ _waiters->push(waiter);
}
- // If the end of the queue is this node, then the new end is the preceding node.
- if (_queueEnd == &node) {
- _queueEnd = node.previous;
- node.previous->next = nullptr;
- return;
+ auto res = atomic_wait(waiter->futexWord, TicketWaiter::State::Waiting, deadline);
+ if (res == stdx::cv_status::timeout) {
+ // If we timed out, we need to invalidate ourselves, but ensure that we take a ticket if
+ // it was given.
+ auto state = static_cast<uint32_t>(TicketWaiter::State::Waiting);
+ if (waiter->futexWord.compareAndSwap(&state, TicketWaiter::State::TimedOut)) {
+ // Successfully set outselves to timed out so nobody tries to give us a ticket.
+ return false;
+ } else {
+ // We were given a ticket anyways. We must take it.
+ invariant(state == TicketWaiter::State::Acquired);
+ return true;
+ }
}
-
- // Otherwise we're in the middle of the list. Preceding and successive nodes must be updated
- // accordingly.
- node.previous->next = node.next;
- node.next->previous = node.previous;
+ invariant(waiter->futexWord.load() == TicketWaiter::State::Acquired);
+ return true;
}
-TicketBroker::WaitingResult TicketBroker::attemptWaitForTicketUntil(
- stdx::unique_lock<stdx::mutex> growthLock, Date_t until) noexcept {
- // Stack allocate the node of the linked list, this approach lets us ignore heap memory in
- // favour of stack memory which is dramatically cheaper. Care must be taken to ensure that there
- // are no references left in the queue to this node once returning from the method.
- //
- // If std::promise didn't perform a heap allocation we could use it here.
- Node node;
-
- // We add ourselves as a waiter, we are still holding the lock here.
- _registerAsWaiter(growthLock, node);
-
- // Finished modifying the linked list, the lock can be released now.
- growthLock.unlock();
-
- // We now wait until obtaining the notification via the futex word.
- auto waitResult = atomic_wait(node.futexWord, 0, until);
- bool hasTimedOut = waitResult == stdx::cv_status::timeout;
-
- if (hasTimedOut) {
- // Timing out implies that the node must be removed from the list, block list modifications
- // to prevent segmentation faults.
- growthLock.lock();
- _unregisterAsWaiter(growthLock, node);
- growthLock.unlock();
+std::shared_ptr<TicketWaiter> TicketPool::_popWaiterOrAddTicketToPool() {
+ stdx::unique_lock<Mutex> lock(_mutex);
+ if (_waiters->empty()) {
+ // We need to ensure we add the ticket back to the pool while holding the mutex. This
+ // prevents a soon-to-be waiter from missing an available ticket. Otherwise, we could
+ // leave a waiter in the queue without ever waking it.
+ _available.addAndFetch(1);
+ return nullptr;
}
+ auto waiter = _waiters->pop();
+ _queued.subtractAndFetch(1);
+ return waiter;
+}
- // If we haven't timed out it means that the ticket has been transferred to our node. The
- // transfer method removes the node from the linked list, so there's no cleanup to be done.
-
- auto hasTicket = node.futexWord.load() != 0;
+void TicketPool::_release() {
+ while (auto waiter = _popWaiterOrAddTicketToPool()) {
+ auto state = static_cast<uint32_t>(TicketWaiter::State::Waiting);
+ if (waiter->futexWord.compareAndSwap(&state, TicketWaiter::State::Acquired)) {
+ atomic_notify_one(waiter->futexWord);
+ return;
+ } else {
+ // We raced with the waiter timing out, so we didn't transfer the ticket. Try again.
+ invariant(state == TicketWaiter::State::TimedOut);
+ }
+ }
+}
- return TicketBroker::WaitingResult{hasTimedOut, hasTicket};
+void TicketPool::release() {
+ _release();
}
-bool TicketBroker::attemptToTransferTicket(
- const stdx::unique_lock<stdx::mutex>& growthLock) noexcept {
- // We can only transfer a ticket if there is a thread waiting for it.
- if (_numQueued.load() > 0) {
- _numQueued.fetchAndSubtract(1);
-
- // We notify the first element in the queue. To avoid race conditions we first remove the
- // node and then notify the waiting thread. Doing the opposite risks a segmentation fault if
- // the node gets deallocated before we remove it from the list.
- auto node = _queueBegin;
- _queueBegin = node->next;
- if (_queueBegin) {
- // Next node isn't empty, we must inform it that it's first in line.
- _queueBegin->previous = nullptr;
- }
- auto& futexAtomic = node->futexWord;
- futexAtomic.store(1);
- // We've transferred a ticket and removed the node from the list, inform the waiting thread
- // that it can proceed.
- atomic_notify_one(futexAtomic);
+bool TicketPool::releaseIfWaiters() {
+ // This is prone to race conditions, but is intended as a fast-path to avoid taking the mutex
+ // unnecessarily.
+ if (_queued.load()) {
+ _release();
return true;
}
return false;
diff --git a/src/mongo/util/concurrency/ticket_pool.h b/src/mongo/util/concurrency/ticket_pool.h
new file mode 100644
index 00000000000..4cdfaf9dce7
--- /dev/null
+++ b/src/mongo/util/concurrency/ticket_pool.h
@@ -0,0 +1,172 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <list>
+
+#include "mongo/platform/atomic_word.h"
+#include "mongo/util/concurrency/admission_context.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+/**
+ * A ticket waiter represents an operation that queues when no tickets are available.
+ */
+struct TicketWaiter {
+ enum State : uint32_t {
+ // This is the initial state. May transition to only Acquired or TimedOut.
+ Waiting = 0,
+ // A releaser will set the waiter to the Acquired state when a ticket is available. This is
+ // a terminal state.
+ Acquired,
+ // The waiter will transition to this state when it times out. Releasers will not give
+ // tickets to waiters in the TimedOut state. This is a terminal state.
+ TimedOut,
+ };
+ AtomicWord<uint32_t> futexWord{Waiting};
+
+ // Only valid to dereference when in the Waiting state and while holding the queue lock.
+ AdmissionContext* context{nullptr};
+};
+
+/**
+ * A TicketQueue is an interface that represents a queue of waiters whose ordering is
+ * implementation-defined.
+ */
+class TicketQueue {
+public:
+ virtual ~TicketQueue(){};
+ virtual bool empty() const = 0;
+ virtual void push(std::shared_ptr<TicketWaiter>) = 0;
+ virtual std::shared_ptr<TicketWaiter> pop() = 0;
+};
+
+/**
+ * The FifoTicketQueue is a simple FIFO queue where new waiters are placed at the end and the oldest
+ * waiters are removed first.
+ */
+class FifoTicketQueue : public TicketQueue {
+public:
+ bool empty() const {
+ return _queue.empty();
+ }
+
+ void push(std::shared_ptr<TicketWaiter> val) {
+ _queue.push_back(std::move(val));
+ }
+
+ std::shared_ptr<TicketWaiter> pop() {
+ auto front = std::move(_queue.front());
+ _queue.pop_front();
+ return front;
+ }
+
+private:
+ std::list<std::shared_ptr<TicketWaiter>> _queue;
+};
+
+/**
+ * A TicketPool holds tickets and queues waiters in the provided TicketQueue. The TicketPool
+ * attempts to emulate a semaphore with a custom queueing policy.
+ *
+ * All public functions are thread-safe except where explicitly stated otherwise.
+ */
+class TicketPool {
+public:
+ TicketPool(int numTickets, std::unique_ptr<TicketQueue> queue);
+
+ /**
+ * Attempt to acquire a ticket without blocking. Returns true if a ticket was granted.
+ */
+ bool tryAcquire();
+
+ /**
+ * Acquire a ticket until the provided deadline. Returns false on timeout, true otherwise.
+ */
+ bool acquire(AdmissionContext* admCtx, Date_t deadline);
+
+ /**
+ * Releases a ticket to the pool. Will will wake a waiter, if there are any queued operations.
+ */
+ void release();
+
+ /**
+ * If there are queued operations, releases a ticket and returns true. Otherwise, does nothing
+ * and returns false.
+ */
+ bool releaseIfWaiters();
+
+ /**
+ * Returns the number of tickets available.
+ */
+ int32_t available() const {
+ return _available.load();
+ }
+
+ /**
+ * Returns the number of queued waiters.
+ */
+ int32_t queued() const {
+ return _queued.load();
+ }
+
+ /*
+ * Provides direct access to the underlying queue. Callers must ensure they only use thread-safe
+ * functions.
+ */
+ TicketQueue* getQueue() {
+ return _waiters.get();
+ }
+
+private:
+ /**
+ * Attempt to give the ticket to a waiter, and otherwise the pool.
+ */
+ void _release();
+
+ /**
+ * Removes the next waiter from the queue. If there are no waiters, adds the ticket to the pool.
+ * Ensures that no new waiters queue while this is happening.
+ */
+ std::shared_ptr<TicketWaiter> _popWaiterOrAddTicketToPool();
+
+ AtomicWord<int32_t> _available;
+
+ // This counter is redundant with the _waiters queue length, but provides the releaseIfWaiters()
+ // a fast-path that avoids taking the queue mutex.
+ AtomicWord<int32_t> _queued;
+
+ // This mutex protects the _waiters queue by preventing items from being added and removed, but
+ // does not protect the elements of the queue.
+ Mutex _mutex = MONGO_MAKE_LATCH("TicketPool::_mutex");
+ std::unique_ptr<TicketQueue> _waiters;
+};
+} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticket_broker_test.cpp b/src/mongo/util/concurrency/ticket_pool_test.cpp
index 20008a721bb..051ac3a3e00 100644
--- a/src/mongo/util/concurrency/ticket_broker_test.cpp
+++ b/src/mongo/util/concurrency/ticket_pool_test.cpp
@@ -35,7 +35,7 @@
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
-#include "mongo/util/concurrency/ticket_broker.h"
+#include "mongo/util/concurrency/ticket_pool.h"
#include "mongo/util/duration.h"
#include "mongo/util/timer.h"
@@ -76,55 +76,37 @@ static inline const Milliseconds kSleepTime{1};
} \
}
-TEST(TicketBrokerTest, BasicTimeout) {
- TicketBroker broker;
-
- stdx::mutex brokerMutex; // NOLINT
+TEST(TicketPoolTest, BasicTimeout) {
+ TicketPool pool(0, std::make_unique<FifoTicketQueue>());
{
- stdx::unique_lock lk(brokerMutex);
- auto result =
- broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Milliseconds{10});
- ASSERT_FALSE(result.hasTicket);
- ASSERT_TRUE(result.hasTimedOut);
+ AdmissionContext ctx;
+ ASSERT_FALSE(pool.acquire(&ctx, Date_t::now() + Milliseconds{10}));
}
{
- stdx::unique_lock lk(brokerMutex);
- auto result = broker.attemptWaitForTicketUntil(std::move(lk), Date_t::min());
- ASSERT_FALSE(result.hasTicket);
- ASSERT_TRUE(result.hasTimedOut);
+ AdmissionContext admCtx;
+ ASSERT_FALSE(pool.acquire(&admCtx, Date_t::min()));
}
}
-TEST(TicketBrokerTest, HandOverWorks) {
- TicketBroker broker;
-
- stdx::mutex brokerMutex; // NOLINT
+TEST(TicketPoolTest, HandOverWorks) {
+ TicketPool pool(0, std::make_unique<FifoTicketQueue>());
{
- {
- stdx::unique_lock growthLock(brokerMutex);
- ASSERT_FALSE(broker.attemptToTransferTicket(growthLock));
- }
+ ASSERT_FALSE(pool.tryAcquire());
stdx::thread waitingThread([&] {
- stdx::unique_lock lk(brokerMutex);
- auto result =
- broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Seconds{10});
- ASSERT_TRUE(result.hasTicket);
- ASSERT_FALSE(result.hasTimedOut);
+ AdmissionContext ctx;
+ ASSERT_TRUE(pool.acquire(&ctx, Date_t::now() + Seconds{10}));
});
assertSoon([&] {
- ASSERT_SOON_EXP(broker.waitingThreadsRelaxed() == 1);
+ ASSERT_SOON_EXP(pool.queued() == 1);
return true;
});
- {
- stdx::unique_lock growthLock(brokerMutex);
- ASSERT_TRUE(broker.attemptToTransferTicket(growthLock));
- }
+ { ASSERT_TRUE(pool.releaseIfWaiters()); }
waitingThread.join();
}
@@ -136,23 +118,19 @@ TEST(TicketBrokerTest, HandOverWorks) {
for (int i = 0; i < threadsToTest; i++) {
threads.emplace_back([&] {
- stdx::unique_lock lk(brokerMutex);
- auto result =
- broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Seconds{10});
- ASSERT_TRUE(result.hasTicket);
- ASSERT_FALSE(result.hasTimedOut);
+ AdmissionContext ctx;
+ ASSERT_TRUE(pool.acquire(&ctx, Date_t::now() + Seconds{10}));
pendingThreads.subtractAndFetch(1);
});
}
assertSoon([&] {
- ASSERT_SOON_EXP(broker.waitingThreadsRelaxed() == 10);
+ ASSERT_SOON_EXP(pool.queued() == 10);
return true;
});
for (int i = 1; i <= threadsToTest; i++) {
- stdx::unique_lock growthLock(brokerMutex);
- ASSERT_TRUE(broker.attemptToTransferTicket(growthLock));
+ ASSERT_TRUE(pool.releaseIfWaiters());
assertSoon([&] {
ASSERT_SOON_EXP(pendingThreads.load() == threadsToTest - i);
return true;
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index d755b665626..03ef2ec90a4 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -43,7 +43,7 @@ namespace mongo {
namespace {
void updateQueueStatsOnRelease(ServiceContext* serviceContext,
- TicketHolderWithQueueingStats::QueueStats& queueStats,
+ TicketHolder::QueueStats& queueStats,
AdmissionContext* admCtx) {
queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1);
auto startTime = admCtx->getStartProcessingTime();
@@ -53,7 +53,7 @@ void updateQueueStatsOnRelease(ServiceContext* serviceContext,
}
void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext,
- TicketHolderWithQueueingStats::QueueStats& queueStats,
+ TicketHolder::QueueStats& queueStats,
AdmissionContext* admCtx) {
if (admCtx->getAdmissions() == 0) {
queueStats.totalNewAdmissions.fetchAndAddRelaxed(1);
@@ -63,34 +63,34 @@ void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext,
}
} // namespace
-void TicketHolderWithQueueingStats::resize(int32_t newSize) noexcept {
+void TicketHolder::resize(int32_t newSize) noexcept {
stdx::lock_guard<Latch> lk(_resizeMutex);
_resize(newSize, _outof.load());
_outof.store(newSize);
}
-void TicketHolderWithQueueingStats::appendStats(BSONObjBuilder& b) const {
+void TicketHolder::appendStats(BSONObjBuilder& b) const {
b.append("out", used());
b.append("available", available());
b.append("totalTickets", outof());
+ b.append("immediatePriorityAdmissionsCount", getImmediatePriorityAdmissionsCount());
_appendImplStats(b);
}
-void TicketHolderWithQueueingStats::_releaseToTicketPool(AdmissionContext* admCtx) noexcept {
+void TicketHolder::_releaseToTicketPool(AdmissionContext* admCtx) noexcept {
auto& queueStats = _getQueueStatsToUse(admCtx);
updateQueueStatsOnRelease(_serviceContext, queueStats, admCtx);
_releaseToTicketPoolImpl(admCtx);
}
-Ticket TicketHolderWithQueueingStats::waitForTicket(OperationContext* opCtx,
- AdmissionContext* admCtx) {
+Ticket TicketHolder::waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx) {
auto res = waitForTicketUntil(opCtx, admCtx, Date_t::max());
invariant(res);
return std::move(*res);
}
-boost::optional<Ticket> TicketHolderWithQueueingStats::tryAcquire(AdmissionContext* admCtx) {
+boost::optional<Ticket> TicketHolder::tryAcquire(AdmissionContext* admCtx) {
// 'kImmediate' operations should always bypass the ticketing system.
invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate);
auto ticket = _tryAcquireImpl(admCtx);
@@ -104,9 +104,9 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::tryAcquire(AdmissionConte
}
-boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until) {
+boost::optional<Ticket> TicketHolder::waitForTicketUntil(OperationContext* opCtx,
+ AdmissionContext* admCtx,
+ Date_t until) {
invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate);
// Attempt a quick acquisition first.
@@ -145,11 +145,11 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat
}
}
-int32_t TicketHolderWithQueueingStats::getAndResetPeakUsed() {
+int32_t TicketHolder::getAndResetPeakUsed() {
return _peakUsed.swap(used());
}
-void TicketHolderWithQueueingStats::_updatePeakUsed() {
+void TicketHolder::_updatePeakUsed() {
if (!feature_flags::gFeatureFlagExecutionControl.isEnabledAndIgnoreFCV()) {
return;
}
@@ -161,8 +161,7 @@ void TicketHolderWithQueueingStats::_updatePeakUsed() {
}
}
-void TicketHolderWithQueueingStats::_appendCommonQueueImplStats(BSONObjBuilder& b,
- const QueueStats& stats) const {
+void TicketHolder::_appendCommonQueueImplStats(BSONObjBuilder& b, const QueueStats& stats) const {
auto removed = stats.totalRemovedQueue.loadRelaxed();
auto added = stats.totalAddedQueue.loadRelaxed();
@@ -195,6 +194,16 @@ boost::optional<Ticket> MockTicketHolder::waitForTicketUntil(OperationContext*,
return {};
}
+boost::optional<Ticket> MockTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
+ return boost::none;
+}
+
+boost::optional<Ticket> MockTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
+ AdmissionContext* admCtx,
+ Date_t until) {
+ return boost::none;
+}
+
void MockTicketHolder::setUsed(int32_t used) {
_used = used;
if (_used > _peakUsed) {
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index e2325120385..6eb66bbc0b5 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -53,25 +53,27 @@ class TicketHolder {
friend class Ticket;
public:
+ TicketHolder(int32_t numTickets, ServiceContext* svcCtx)
+ : _outof(numTickets), _serviceContext(svcCtx){};
virtual ~TicketHolder(){};
/**
* Adjusts the total number of tickets allocated for the ticket pool to 'newSize'.
*/
- virtual void resize(int32_t newSize) noexcept {};
+ virtual void resize(int32_t newSize) noexcept;
/**
* Attempts to acquire a ticket without blocking.
- * Returns a boolean indicating whether the operation was successful or not.
+ * Returns a ticket if one is available, and boost::none otherwise.
*/
- virtual boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) = 0;
+ virtual boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx);
/**
* Attempts to acquire a ticket. Blocks until a ticket is acquired or the OperationContext
* 'opCtx' is killed, throwing an AssertionException. If no OperationContext is provided, then
* the operation is uninterruptible.
*/
- virtual Ticket waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx) = 0;
+ virtual Ticket waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx);
/**
* Attempts to acquire a ticket within a deadline, 'until'. Returns 'true' if a ticket is
@@ -81,96 +83,66 @@ public:
*/
virtual boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx,
AdmissionContext* admCtx,
- Date_t until) = 0;
-
- virtual void appendStats(BSONObjBuilder& b) const = 0;
-
- /**
- * 'Immediate' admissions don't need to acquire or wait for a ticket. However, they should
- * report to the TicketHolder for tracking purposes.
- *
- * Increments the count of 'immediate' priority admissions reported.
- */
- virtual void reportImmediatePriorityAdmission() = 0;
+ Date_t until);
/**
- * Instantaneous number of tickets 'available' (not checked out by an operation) in the ticket
- * pool.
+ * The total number of tickets allotted to the ticket pool.
*/
- virtual int32_t available() const = 0;
+ virtual int32_t outof() const {
+ return _outof.loadRelaxed();
+ }
/**
* Instantaneous number of tickets that are checked out by an operation.
*/
- virtual int32_t used() const = 0;
+ virtual int32_t used() const {
+ return outof() - available();
+ }
/**
* Peak number of tickets checked out at once since the previous time this function was called.
*/
- virtual int32_t getAndResetPeakUsed() = 0;
+ virtual int32_t getAndResetPeakUsed();
/**
- * The total number of tickets allotted to the ticket pool.
+ * 'Immediate' admissions don't need to acquire or wait for a ticket. However, they should
+ * report to the TicketHolder for tracking purposes.
+ *
+ * Increments the count of 'immediate' priority admissions reported.
*/
- virtual int32_t outof() const = 0;
+ virtual void reportImmediatePriorityAdmission() {
+ _immediatePriorityAdmissionsCount.fetchAndAdd(1);
+ }
/**
- * The total number of operations that acquired a ticket, completed their work, and released the
- * ticket.
+ * Returns the number of 'immediate' priority admissions, which always bypass ticket
+ * acquisition.
*/
- virtual int64_t numFinishedProcessing() const = 0;
+ int64_t getImmediatePriorityAdmissionsCount() const {
+ return _immediatePriorityAdmissionsCount.loadRelaxed();
+ }
+
+ virtual void appendStats(BSONObjBuilder& b) const;
-private:
/**
- * Releases a ticket back into the ticketing pool.
+ * Instantaneous number of tickets 'available' (not checked out by an operation) in the ticket
+ * pool.
*/
- virtual void _releaseToTicketPool(AdmissionContext* admCtx) noexcept = 0;
-};
-
-/**
- * A ticketholder which manages both aggregate and policy specific queueing statistics.
- */
-class TicketHolderWithQueueingStats : public TicketHolder {
-public:
- TicketHolderWithQueueingStats(int32_t numTickets, ServiceContext* svcCtx)
- : _outof(numTickets), _serviceContext(svcCtx){};
-
- ~TicketHolderWithQueueingStats() override{};
-
- boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) override;
-
- Ticket waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx) override;
-
- boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until) override;
+ virtual int32_t available() const = 0;
/**
- * Adjusts the total number of tickets allocated for the ticket pool to 'newSize'.
+ * Instantaneous number of operations waiting in queue for a ticket.
+ *
+ * TODO SERVER-74082: Once the SemaphoreTicketHolder is removed, consider changing this metric
+ * to int32_t.
*/
- void resize(int32_t newSize) noexcept override;
-
- int32_t used() const override {
- return outof() - available();
- }
-
- int32_t outof() const override {
- return _outof.loadRelaxed();
- }
+ virtual int64_t queued() const = 0;
/**
- * Returns the number of 'immediate' priority admissions, which always bypass ticket
- * acquisition.
+ * The total number of operations that acquired a ticket, completed their work, and released the
+ * ticket.
*/
- int64_t getImmediatePriorityAdmissionsCount() const {
- return _immediatePriorityAdmissionsCount.loadRelaxed();
- }
-
- void reportImmediatePriorityAdmission() override final {
- _immediatePriorityAdmissionsCount.fetchAndAdd(1);
- }
-
- void appendStats(BSONObjBuilder& b) const override;
+ virtual int64_t numFinishedProcessing() const = 0;
/**
* Statistics for queueing mechanisms in the TicketHolder implementations. The term "Queue" is a
@@ -188,18 +160,13 @@ public:
AtomicWord<std::int64_t> totalTimeQueuedMicros{0};
};
- int32_t getAndResetPeakUsed() override;
-
+private:
/**
- * Instantaneous number of operations waiting in queue for a ticket.
- *
- * TODO SERVER-74082: Once the SemaphoreTicketHolder is removed, consider changing this metric
- * to int32_t.
+ * Releases a ticket back into the ticketing pool.
*/
- virtual int64_t queued() const = 0;
+ virtual void _releaseToTicketPool(AdmissionContext* admCtx) noexcept;
-private:
- void _releaseToTicketPool(AdmissionContext* admCtx) noexcept override final;
+ virtual void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept = 0;
virtual boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) = 0;
@@ -207,11 +174,9 @@ private:
AdmissionContext* admCtx,
Date_t until) = 0;
- virtual void _appendImplStats(BSONObjBuilder& b) const = 0;
+ virtual void _appendImplStats(BSONObjBuilder& b) const {}
- virtual void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept = 0;
-
- virtual void _resize(int32_t newSize, int32_t oldSize) noexcept = 0;
+ virtual void _resize(int32_t newSize, int32_t oldSize) noexcept {}
/**
* Fetches the queueing statistics corresponding to the 'admCtx'. All statistics that are queue
@@ -221,8 +186,8 @@ private:
void _updatePeakUsed();
- Mutex _resizeMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2),
- "TicketHolderWithQueueingStats::_resizeMutex");
+ Mutex _resizeMutex =
+ MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2), "TicketHolder::_resizeMutex");
AtomicWord<int32_t> _outof;
AtomicWord<int32_t> _peakUsed;
AtomicWord<std::int64_t> _immediatePriorityAdmissionsCount;
@@ -238,6 +203,8 @@ protected:
class MockTicketHolder : public TicketHolder {
public:
+ MockTicketHolder() : TicketHolder(0, nullptr) {}
+
void resize(int32_t newSize) noexcept override {
_outof = newSize;
}
@@ -274,15 +241,32 @@ public:
_outof = outof;
}
+ int64_t queued() const override {
+ return 0;
+ }
+
int64_t numFinishedProcessing() const override {
return _numFinishedProcessing;
}
+
void setNumFinishedProcessing(int32_t numFinishedProcessing) {
_numFinishedProcessing = numFinishedProcessing;
}
private:
- void _releaseToTicketPool(AdmissionContext*) noexcept override {}
+ void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override {}
+
+ boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override;
+
+ boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
+ AdmissionContext* admCtx,
+ Date_t until) override;
+
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override {
+ return _stats;
+ }
+
+ QueueStats _stats;
int32_t _used = 0;
int32_t _peakUsed = 0;
@@ -296,7 +280,6 @@ private:
*/
class Ticket {
friend class TicketHolder;
- friend class TicketHolderWithQueueingStats;
friend class SemaphoreTicketHolder;
friend class PriorityTicketHolder;
friend class MockTicketHolder;
diff --git a/src/mongo/util/concurrency/ticketholder_bm.cpp b/src/mongo/util/concurrency/ticketholder_bm.cpp
index 04c5277a0a9..a3f1c767bdc 100644
--- a/src/mongo/util/concurrency/ticketholder_bm.cpp
+++ b/src/mongo/util/concurrency/ticketholder_bm.cpp
@@ -45,7 +45,7 @@
namespace mongo {
namespace {
-static int kTickets = 128;
+static int kTickets = 32;
static int kThreadMin = 16;
static int kThreadMax = 1024;
static int kLowPriorityAdmissionBypassThreshold = 100;
@@ -183,6 +183,7 @@ void BM_acquireAndRelease(benchmark::State& state) {
BENCHMARK_TEMPLATE(BM_acquireAndRelease, SemaphoreTicketHolder, AdmissionsPriority::kNormal)
->Threads(kThreadMin)
->Threads(kTickets)
+ ->Threads(128)
->Threads(kThreadMax);
// TODO SERVER-72616: Remove ifdefs once PriorityTicketHolder is available cross-platform.
@@ -191,6 +192,7 @@ BENCHMARK_TEMPLATE(BM_acquireAndRelease, SemaphoreTicketHolder, AdmissionsPriori
BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriority::kNormal)
->Threads(kThreadMin)
->Threads(kTickets)
+ ->Threads(128)
->Threads(kThreadMax);
// Low priority operations are expected to take longer to acquire a ticket because they are forced
@@ -198,6 +200,7 @@ BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriorit
BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriority::kLow)
->Threads(kThreadMin)
->Threads(kTickets)
+ ->Threads(128)
->Threads(kThreadMax);
// This benchmark is intended for comparisons between different iterations of the
@@ -209,6 +212,7 @@ BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriorit
BENCHMARK_TEMPLATE(BM_acquireAndRelease, PriorityTicketHolder, AdmissionsPriority::kNormalAndLow)
->Threads(kThreadMin)
->Threads(kTickets)
+ ->Threads(128)
->Threads(kThreadMax);
#endif
diff --git a/src/mongo/util/concurrency/ticketholder_test_fixture.cpp b/src/mongo/util/concurrency/ticketholder_test_fixture.cpp
index 539c9666fae..35057522e11 100644
--- a/src/mongo/util/concurrency/ticketholder_test_fixture.cpp
+++ b/src/mongo/util/concurrency/ticketholder_test_fixture.cpp
@@ -42,7 +42,7 @@ void TicketHolderTestFixture::setUp() {
}
void TicketHolderTestFixture::basicTimeout(OperationContext* opCtx,
- std::unique_ptr<TicketHolderWithQueueingStats> holder) {
+ std::unique_ptr<TicketHolder> holder) {
ASSERT_EQ(holder->used(), 0);
ASSERT_EQ(holder->available(), 1);
ASSERT_EQ(holder->outof(), 1);
@@ -68,7 +68,7 @@ void TicketHolderTestFixture::basicTimeout(OperationContext* opCtx,
}
void TicketHolderTestFixture::resizeTest(OperationContext* opCtx,
- std::unique_ptr<TicketHolderWithQueueingStats> holder,
+ std::unique_ptr<TicketHolder> holder,
TickSourceMock<Microseconds>* tickSource) {
Stats stats(holder.get());
@@ -133,4 +133,25 @@ void TicketHolderTestFixture::resizeTest(OperationContext* opCtx,
ASSERT_FALSE(holder->waitForTicketUntil(opCtx, &admCtx, Date_t::now() + Milliseconds(1)));
}
+void TicketHolderTestFixture::interruptTest(OperationContext* opCtx,
+ std::unique_ptr<TicketHolder> holder) {
+ holder->resize(0);
+
+ auto waiter = stdx::thread([&]() {
+ AdmissionContext admCtx;
+ ASSERT_THROWS_CODE(holder->waitForTicketUntil(opCtx, &admCtx, Date_t::max()),
+ DBException,
+ ErrorCodes::Interrupted);
+ });
+
+ while (!holder->queued()) {
+ }
+
+ ASSERT_EQ(holder->used(), 0);
+ ASSERT_EQ(holder->available(), 0);
+
+ opCtx->markKilled();
+ waiter.join();
+}
+
} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticketholder_test_fixture.h b/src/mongo/util/concurrency/ticketholder_test_fixture.h
index 096a6a64da7..b358665918a 100644
--- a/src/mongo/util/concurrency/ticketholder_test_fixture.h
+++ b/src/mongo/util/concurrency/ticketholder_test_fixture.h
@@ -47,23 +47,28 @@ namespace mongo {
* target the legacy SemaphoreTicketHolder.
*/
class TicketHolderTestFixture : public ServiceContextTest {
+public:
void setUp() override;
protected:
class Stats;
struct MockAdmission;
- void basicTimeout(OperationContext* opCtx,
- std::unique_ptr<TicketHolderWithQueueingStats> holder);
+ void basicTimeout(OperationContext* opCtx, std::unique_ptr<TicketHolder> holder);
/**
* Tests that TicketHolder::resize() does not impact metrics outside of those related to the
* number of tickets available(), used(), and outof().
*/
void resizeTest(OperationContext* opCtx,
- std::unique_ptr<TicketHolderWithQueueingStats> holder,
+ std::unique_ptr<TicketHolder> holder,
TickSourceMock<Microseconds>* tickSource);
+ /**
+ * Tests that ticket acquisition is interruptible.
+ */
+ void interruptTest(OperationContext* opCtx, std::unique_ptr<TicketHolder> holder);
+
ServiceContext::UniqueClient _client;
ServiceContext::UniqueOperationContext _opCtx;
};