summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2023-04-28 14:21:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-28 17:15:33 +0000
commit059bcbe2df11d12249e53263f28dd12dbc185f79 (patch)
treebc3e065d839f6fe9836ab42af646a8a7f63c35f6 /src/mongo
parentbcff3cee04973b3ab91b78a70ca6b5740f22e1a0 (diff)
downloadmongo-059bcbe2df11d12249e53263f28dd12dbc185f79.tar.gz
SERVER-76653 PriorityTicketholder perf improvements
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/util/concurrency/priority_ticketholder.cpp32
-rw-r--r--src/mongo/util/concurrency/priority_ticketholder.h89
-rw-r--r--src/mongo/util/concurrency/ticket_pool.cpp40
-rw-r--r--src/mongo/util/concurrency/ticket_pool.h124
-rw-r--r--src/mongo/util/concurrency/ticket_pool_test.cpp8
5 files changed, 142 insertions, 151 deletions
diff --git a/src/mongo/util/concurrency/priority_ticketholder.cpp b/src/mongo/util/concurrency/priority_ticketholder.cpp
index aa8e581fcf2..4656c7ab4f4 100644
--- a/src/mongo/util/concurrency/priority_ticketholder.cpp
+++ b/src/mongo/util/concurrency/priority_ticketholder.cpp
@@ -45,18 +45,16 @@ namespace mongo {
PriorityTicketHolder::PriorityTicketHolder(int32_t numTickets,
int32_t lowPriorityBypassThreshold,
ServiceContext* serviceContext)
- : TicketHolder(numTickets, serviceContext), _serviceContext(serviceContext) {
-
- auto queue = std::make_unique<SimplePriorityTicketQueue>(lowPriorityBypassThreshold);
- _pool = std::make_unique<TicketPool>(numTickets, std::move(queue));
-}
+ : TicketHolder(numTickets, serviceContext),
+ _serviceContext(serviceContext),
+ _pool(numTickets, lowPriorityBypassThreshold) {}
int32_t PriorityTicketHolder::available() const {
- return _pool->available();
+ return _pool.available();
}
int64_t PriorityTicketHolder::queued() const {
- return _pool->queued();
+ return _pool.queued();
}
int64_t PriorityTicketHolder::numFinishedProcessing() const {
@@ -65,22 +63,22 @@ int64_t PriorityTicketHolder::numFinishedProcessing() const {
}
int64_t PriorityTicketHolder::expedited() const {
- return static_cast<SimplePriorityTicketQueue*>(_pool->getQueue())->expedited();
+ return _pool.getQueue().expedited();
}
int64_t PriorityTicketHolder::bypassed() const {
- return static_cast<SimplePriorityTicketQueue*>(_pool->getQueue())->bypassed();
+ return _pool.getQueue().bypassed();
}
void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) {
- auto queue = static_cast<SimplePriorityTicketQueue*>(_pool->getQueue());
- queue->updateLowPriorityAdmissionBypassThreshold(newBypassThreshold);
+ auto& queue = _pool.getQueue();
+ queue.updateLowPriorityAdmissionBypassThreshold(newBypassThreshold);
}
boost::optional<Ticket> PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
invariant(admCtx);
- if (_pool->tryAcquire()) {
+ if (_pool.tryAcquire()) {
return Ticket(this, admCtx);
}
@@ -97,11 +95,11 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC
// maximum period of time. This may or may not succeed, in which case we will retry until
// timing out or getting interrupted.
auto maxUntil = std::min(until, Date_t::now() + Milliseconds(500));
- auto acquired = _pool->acquire(admCtx, maxUntil);
+ auto acquired = _pool.acquire(admCtx, maxUntil);
ScopeGuard rereleaseIfTimedOutOrInterrupted([&] {
// We may have gotten a ticket that we can't use, release it back to the ticket pool.
if (acquired) {
- _pool->release();
+ _pool.release();
}
});
@@ -125,7 +123,7 @@ boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationC
void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
// 'Immediate' priority operations should bypass the ticketing system completely.
invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate);
- _pool->release();
+ _pool.release();
}
void PriorityTicketHolder::_resize(int32_t newSize, int32_t oldSize) noexcept {
@@ -134,13 +132,13 @@ void PriorityTicketHolder::_resize(int32_t newSize, int32_t oldSize) noexcept {
if (difference > 0) {
// Hand out tickets one-by-one until we've given them all out.
for (auto remaining = difference; remaining > 0; remaining--) {
- _pool->release();
+ _pool.release();
}
} else {
AdmissionContext admCtx;
// Take tickets one-by-one without releasing.
for (auto remaining = -difference; remaining > 0; remaining--) {
- _pool->acquire(&admCtx, Date_t::max());
+ _pool.acquire(&admCtx, Date_t::max());
}
}
}
diff --git a/src/mongo/util/concurrency/priority_ticketholder.h b/src/mongo/util/concurrency/priority_ticketholder.h
index 54c07e640b1..8d36e930758 100644
--- a/src/mongo/util/concurrency/priority_ticketholder.h
+++ b/src/mongo/util/concurrency/priority_ticketholder.h
@@ -51,91 +51,6 @@ enum class QueueType : unsigned int { kLowPriority = 0, kNormalPriority = 1, Num
class Ticket;
/**
- * This SimplePriorityTicketQueue implements a queue policy that separates normal and low priority
- * operations into separate queues. Normal priority operations are always scheduled ahead of low
- * priority ones, except when a positive lowPriorityBypassThreshold is provided. This parameter
- * specifies how often a waiting low-priority operation should skip the queue and be scheduled ahead
- * of waiting normal priority operations.
- */
-class SimplePriorityTicketQueue : public TicketQueue {
-public:
- SimplePriorityTicketQueue(int lowPriorityBypassThreshold)
- : _lowPriorityBypassThreshold(lowPriorityBypassThreshold) {}
-
- bool empty() const final {
- return _normal.empty() && _low.empty();
- }
-
- void push(std::shared_ptr<TicketWaiter> val) final {
- if (val->context->getPriority() == AdmissionContext::Priority::kLow) {
- _low.push(std::move(val));
- return;
- }
- invariant(val->context->getPriority() == AdmissionContext::Priority::kNormal);
- _normal.push(std::move(val));
- }
-
- std::shared_ptr<TicketWaiter> pop() final {
- if (!_normal.empty() && !_low.empty() && _lowPriorityBypassThreshold.load() > 0 &&
- _lowPriorityBypassCount.fetchAndAdd(1) % _lowPriorityBypassThreshold.load() == 0) {
- auto front = std::move(_low.front());
- _low.pop();
- _expeditedLowPriorityAdmissions.addAndFetch(1);
- return front;
- }
- if (!_normal.empty()) {
- auto front = std::move(_normal.front());
- _normal.pop();
- return front;
- }
- auto front = std::move(_low.front());
- _low.pop();
- return front;
- }
-
- /**
- * Number of times low priority operations are expedited for ticket admission over normal
- * priority operations.
- */
- std::int64_t expedited() const {
- return _expeditedLowPriorityAdmissions.loadRelaxed();
- }
-
- /**
- * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the
- * normal priority queue when a ticket becomes available.
- */
- std::int64_t bypassed() const {
- return _lowPriorityBypassCount.loadRelaxed();
- }
-
- void updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) {
- _lowPriorityBypassThreshold.store(newBypassThreshold);
- }
-
-private:
- /**
- * Limits the number times the low priority queue is non-empty and bypassed in favor of the
- * normal priority queue for the next ticket admission.
- */
- AtomicWord<std::int32_t> _lowPriorityBypassThreshold;
-
- /**
- * Number of times ticket admission is expedited for low priority operations.
- */
- AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0};
-
- /**
- * Counts the number of times normal operations are dequeued over operations queued in the low
- * priority queue. We explicitly use an unsigned type here because rollover is desired.
- */
- AtomicWord<std::uint64_t> _lowPriorityBypassCount{0};
-
- std::queue<std::shared_ptr<TicketWaiter>> _normal;
- std::queue<std::shared_ptr<TicketWaiter>> _low;
-};
-
-/**
* A PriorityTicketHolder supports queueing and prioritization of operations based on
* AdmissionContext::Priority.
*
@@ -193,9 +108,7 @@ private:
}
std::array<QueueStats, static_cast<unsigned int>(QueueType::NumQueues)> _stats;
-
- std::unique_ptr<TicketPool> _pool;
-
ServiceContext* _serviceContext;
+ TicketPool<SimplePriorityTicketQueue> _pool;
};
} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticket_pool.cpp b/src/mongo/util/concurrency/ticket_pool.cpp
index dfda5ebcf00..ca82fdb9884 100644
--- a/src/mongo/util/concurrency/ticket_pool.cpp
+++ b/src/mongo/util/concurrency/ticket_pool.cpp
@@ -111,10 +111,8 @@ static void atomic_notify_one(AtomicWord<uint32_t>& atomic) noexcept {
}
} // namespace
-TicketPool::TicketPool(int numTickets, std::unique_ptr<TicketQueue> queue)
- : _available(numTickets), _queued(0), _waiters(std::move(queue)) {}
-
-bool TicketPool::tryAcquire() {
+template <class Queue>
+bool TicketPool<Queue>::tryAcquire() {
auto available = _available.load();
bool gotTicket = false;
while (available > 0 && !gotTicket) {
@@ -123,7 +121,8 @@ bool TicketPool::tryAcquire() {
return gotTicket;
}
-bool TicketPool::acquire(AdmissionContext* admCtx, Date_t deadline) {
+template <class Queue>
+bool TicketPool<Queue>::acquire(AdmissionContext* admCtx, Date_t deadline) {
auto waiter = std::make_shared<TicketWaiter>();
waiter->context = admCtx;
@@ -134,9 +133,9 @@ bool TicketPool::acquire(AdmissionContext* admCtx, Date_t deadline) {
if (tryAcquire()) {
return true;
}
- _queued.addAndFetch(1);
- _waiters->push(waiter);
+ _waiters.push(waiter);
}
+ _queued.addAndFetch(1);
auto res = atomic_wait(waiter->futexWord, TicketWaiter::State::Waiting, deadline);
if (res == stdx::cv_status::timeout) {
@@ -156,22 +155,23 @@ bool TicketPool::acquire(AdmissionContext* admCtx, Date_t deadline) {
return true;
}
-std::shared_ptr<TicketWaiter> TicketPool::_popWaiterOrAddTicketToPool() {
+template <class Queue>
+std::shared_ptr<TicketWaiter> TicketPool<Queue>::_popWaiterOrAddTicketToPool() {
stdx::unique_lock<Mutex> lock(_mutex);
- if (_waiters->empty()) {
+ auto waiter = _waiters.pop();
+ if (!waiter) {
// We need to ensure we add the ticket back to the pool while holding the mutex. This
// prevents a soon-to-be waiter from missing an available ticket. Otherwise, we could
// leave a waiter in the queue without ever waking it.
_available.addAndFetch(1);
- return nullptr;
}
- auto waiter = _waiters->pop();
- _queued.subtractAndFetch(1);
return waiter;
}
-void TicketPool::_release() {
+template <class Queue>
+void TicketPool<Queue>::_release() {
while (auto waiter = _popWaiterOrAddTicketToPool()) {
+ _queued.subtractAndFetch(1);
auto state = static_cast<uint32_t>(TicketWaiter::State::Waiting);
if (waiter->futexWord.compareAndSwap(&state, TicketWaiter::State::Acquired)) {
atomic_notify_one(waiter->futexWord);
@@ -183,18 +183,12 @@ void TicketPool::_release() {
}
}
-void TicketPool::release() {
+template <class Queue>
+void TicketPool<Queue>::release() {
_release();
}
-bool TicketPool::releaseIfWaiters() {
- // This is prone to race conditions, but is intended as a fast-path to avoid taking the mutex
- // unnecessarily.
- if (_queued.load()) {
- _release();
- return true;
- }
- return false;
-}
+template class TicketPool<FifoTicketQueue>;
+template class TicketPool<SimplePriorityTicketQueue>;
} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticket_pool.h b/src/mongo/util/concurrency/ticket_pool.h
index 4cdfaf9dce7..19797da4f70 100644
--- a/src/mongo/util/concurrency/ticket_pool.h
+++ b/src/mongo/util/concurrency/ticket_pool.h
@@ -29,7 +29,7 @@
#pragma once
-#include <list>
+#include <queue>
#include "mongo/platform/atomic_word.h"
#include "mongo/util/concurrency/admission_context.h"
@@ -64,7 +64,6 @@ struct TicketWaiter {
class TicketQueue {
public:
virtual ~TicketQueue(){};
- virtual bool empty() const = 0;
virtual void push(std::shared_ptr<TicketWaiter>) = 0;
virtual std::shared_ptr<TicketWaiter> pop() = 0;
};
@@ -75,33 +74,122 @@ public:
*/
class FifoTicketQueue : public TicketQueue {
public:
- bool empty() const {
- return _queue.empty();
- }
-
void push(std::shared_ptr<TicketWaiter> val) {
- _queue.push_back(std::move(val));
+ _queue.push(std::move(val));
}
std::shared_ptr<TicketWaiter> pop() {
+ if (_queue.empty()) {
+ return nullptr;
+ }
auto front = std::move(_queue.front());
- _queue.pop_front();
+ _queue.pop();
return front;
}
private:
- std::list<std::shared_ptr<TicketWaiter>> _queue;
+ std::queue<std::shared_ptr<TicketWaiter>> _queue;
};
/**
+ * This SimplePriorityTicketQueue implements a queue policy that separates normal and low priority
+ * operations into separate queues. Normal priority operations are always scheduled ahead of low
+ * priority ones, except when a positive lowPriorityBypassThreshold is provided. This parameter
+ * specifies how often a waiting low-priority operation should skip the queue and be scheduled ahead
+ * of waiting normal priority operations.
+ */
+class SimplePriorityTicketQueue : public TicketQueue {
+public:
+ SimplePriorityTicketQueue(int lowPriorityBypassThreshold)
+ : _lowPriorityBypassThreshold(lowPriorityBypassThreshold) {}
+
+ void push(std::shared_ptr<TicketWaiter> val) final {
+ if (val->context->getPriority() == AdmissionContext::Priority::kLow) {
+ _low.push(std::move(val));
+ return;
+ }
+ invariant(val->context->getPriority() == AdmissionContext::Priority::kNormal);
+ _normal.push(std::move(val));
+ }
+
+ std::shared_ptr<TicketWaiter> pop() final {
+ auto normalQueued = !_normal.empty();
+ auto lowQueued = !_low.empty();
+ if (!normalQueued && !lowQueued) {
+ return nullptr;
+ }
+ if (normalQueued && lowQueued && _lowPriorityBypassThreshold.load() > 0 &&
+ _lowPriorityBypassCount.fetchAndAdd(1) % _lowPriorityBypassThreshold.load() == 0) {
+ auto front = std::move(_low.front());
+ _low.pop();
+ _expeditedLowPriorityAdmissions.addAndFetch(1);
+ return front;
+ }
+ if (normalQueued) {
+ auto front = std::move(_normal.front());
+ _normal.pop();
+ return front;
+ }
+ auto front = std::move(_low.front());
+ _low.pop();
+ return front;
+ }
+
+ /**
+ * Number of times low priority operations are expedited for ticket admission over normal
+ * priority operations.
+ */
+ std::int64_t expedited() const {
+ return _expeditedLowPriorityAdmissions.loadRelaxed();
+ }
+
+ /**
+ * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the
+ * normal priority queue when a ticket becomes available.
+ */
+ std::int64_t bypassed() const {
+ return _lowPriorityBypassCount.loadRelaxed();
+ }
+
+ void updateLowPriorityAdmissionBypassThreshold(int32_t newBypassThreshold) {
+ _lowPriorityBypassThreshold.store(newBypassThreshold);
+ }
+
+private:
+ /**
+ * Limits the number times the low priority queue is non-empty and bypassed in favor of the
+ * normal priority queue for the next ticket admission.
+ */
+ AtomicWord<std::int32_t> _lowPriorityBypassThreshold;
+
+ /**
+ * Number of times ticket admission is expedited for low priority operations.
+ */
+ AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0};
+
+ /**
+ * Counts the number of times normal operations are dequeued over operations queued in the low
+ * priority queue. We explicitly use an unsigned type here because rollover is desired.
+ */
+ AtomicWord<std::uint64_t> _lowPriorityBypassCount{0};
+
+ std::queue<std::shared_ptr<TicketWaiter>> _normal;
+ std::queue<std::shared_ptr<TicketWaiter>> _low;
+};
+
+
+/**
* A TicketPool holds tickets and queues waiters in the provided TicketQueue. The TicketPool
* attempts to emulate a semaphore with a custom queueing policy.
*
* All public functions are thread-safe except where explicitly stated otherwise.
*/
+template <class Queue>
class TicketPool {
public:
- TicketPool(int numTickets, std::unique_ptr<TicketQueue> queue);
+ template <typename... Args>
+ TicketPool(int numTickets, Args&&... args)
+ : _available(numTickets), _queued(0), _waiters(std::forward<Args>(args)...) {}
/**
* Attempt to acquire a ticket without blocking. Returns true if a ticket was granted.
@@ -119,12 +207,6 @@ public:
void release();
/**
- * If there are queued operations, releases a ticket and returns true. Otherwise, does nothing
- * and returns false.
- */
- bool releaseIfWaiters();
-
- /**
* Returns the number of tickets available.
*/
int32_t available() const {
@@ -142,8 +224,12 @@ public:
* Provides direct access to the underlying queue. Callers must ensure they only use thread-safe
* functions.
*/
- TicketQueue* getQueue() {
- return _waiters.get();
+ const Queue& getQueue() const {
+ return _waiters;
+ }
+
+ Queue& getQueue() {
+ return _waiters;
}
private:
@@ -167,6 +253,6 @@ private:
// This mutex protects the _waiters queue by preventing items from being added and removed, but
// does not protect the elements of the queue.
Mutex _mutex = MONGO_MAKE_LATCH("TicketPool::_mutex");
- std::unique_ptr<TicketQueue> _waiters;
+ Queue _waiters;
};
} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticket_pool_test.cpp b/src/mongo/util/concurrency/ticket_pool_test.cpp
index 051ac3a3e00..80c88e809d4 100644
--- a/src/mongo/util/concurrency/ticket_pool_test.cpp
+++ b/src/mongo/util/concurrency/ticket_pool_test.cpp
@@ -77,7 +77,7 @@ static inline const Milliseconds kSleepTime{1};
}
TEST(TicketPoolTest, BasicTimeout) {
- TicketPool pool(0, std::make_unique<FifoTicketQueue>());
+ TicketPool<FifoTicketQueue> pool(0);
{
AdmissionContext ctx;
@@ -91,7 +91,7 @@ TEST(TicketPoolTest, BasicTimeout) {
}
TEST(TicketPoolTest, HandOverWorks) {
- TicketPool pool(0, std::make_unique<FifoTicketQueue>());
+ TicketPool<FifoTicketQueue> pool(0);
{
ASSERT_FALSE(pool.tryAcquire());
@@ -106,7 +106,7 @@ TEST(TicketPoolTest, HandOverWorks) {
return true;
});
- { ASSERT_TRUE(pool.releaseIfWaiters()); }
+ pool.release();
waitingThread.join();
}
@@ -130,7 +130,7 @@ TEST(TicketPoolTest, HandOverWorks) {
});
for (int i = 1; i <= threadsToTest; i++) {
- ASSERT_TRUE(pool.releaseIfWaiters());
+ pool.release();
assertSoon([&] {
ASSERT_SOON_EXP(pendingThreads.load() == threadsToTest - i);
return true;