summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-09-23 09:39:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-23 10:37:17 +0000
commit509d5d5bedab5c2b52df7b4e6458ef2a1cd607b3 (patch)
treeb1d2eed62e605f50e00c72d2c635952d0d31b971 /src
parent2810900e2b04bce2170b2def8cfdcc22512e65f8 (diff)
downloadmongo-509d5d5bedab5c2b52df7b4e6458ef2a1cd607b3.tar.gz
SERVER-69894 Combine PriorityTicketHolder and SchedulingTicketHolder
Diffstat (limited to 'src')
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp106
-rw-r--r--src/mongo/util/concurrency/ticketholder.h84
2 files changed, 80 insertions, 110 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index d910f292129..554a5f1c888 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -67,6 +67,28 @@ void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext,
queueStats.totalStartedProcessing.fetchAndAddRelaxed(1);
}
+/**
+ * Appends the standard statistics stored in QueueStats to BSONObjBuilder b;
+ */
+void appendCommonQueueImplStats(BSONObjBuilder& b,
+ const TicketHolderWithQueueingStats::QueueStats& stats) {
+ auto removed = stats.totalRemovedQueue.loadRelaxed();
+ auto added = stats.totalAddedQueue.loadRelaxed();
+
+ b.append("addedToQueue", added);
+ b.append("removedFromQueue", removed);
+ b.append("queueLength", std::max(static_cast<int>(added - removed), 0));
+
+ auto finished = stats.totalFinishedProcessing.loadRelaxed();
+ auto started = stats.totalStartedProcessing.loadRelaxed();
+ b.append("startedProcessing", started);
+ b.append("processing", std::max(static_cast<int>(started - finished), 0));
+ b.append("finishedProcessing", finished);
+ b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed());
+ b.append("canceled", stats.totalCanceled.loadRelaxed());
+ b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed());
+ b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed());
+}
} // namespace
TicketHolder* TicketHolder::get(ServiceContext* svcCtx) {
@@ -283,20 +305,7 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat
}
void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
- auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed();
- auto added = _semaphoreStats.totalAddedQueue.loadRelaxed();
- b.append("addedToQueue", added);
- b.append("removedFromQueue", removed);
- b.append("queueLength", std::max(static_cast<int>(added - removed), 0));
- auto finished = _semaphoreStats.totalFinishedProcessing.loadRelaxed();
- auto started = _semaphoreStats.totalStartedProcessing.loadRelaxed();
- b.append("startedProcessing", started);
- b.append("processing", std::max(static_cast<int>(started - finished), 0));
- b.append("finishedProcessing", finished);
- b.append("totalTimeProcessingMicros", _semaphoreStats.totalTimeProcessingMicros.loadRelaxed());
- b.append("canceled", _semaphoreStats.totalCanceled.loadRelaxed());
- b.append("newAdmissions", _semaphoreStats.totalNewAdmissions.loadRelaxed());
- b.append("totalTimeQueuedMicros", _semaphoreStats.totalTimeQueuedMicros.loadRelaxed());
+ appendCommonQueueImplStats(b, _semaphoreStats);
}
#if defined(__linux__)
namespace {
@@ -486,11 +495,9 @@ void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept {
}
#endif
-SchedulingTicketHolder::SchedulingTicketHolder(int numTickets,
- unsigned int numQueues,
- ServiceContext* serviceContext)
+PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext)
: TicketHolderWithQueueingStats(numTickets, serviceContext), _serviceContext(serviceContext) {
- for (std::size_t i = 0; i < numQueues; i++) {
+ for (std::size_t i = 0; i < static_cast<unsigned int>(QueueType::QueueTypeSize); i++) {
_queues.emplace_back(this);
}
_queues.shrink_to_fit();
@@ -498,17 +505,17 @@ SchedulingTicketHolder::SchedulingTicketHolder(int numTickets,
_enqueuedElements.store(0);
}
-SchedulingTicketHolder::~SchedulingTicketHolder() {}
+PriorityTicketHolder::~PriorityTicketHolder() {}
-int SchedulingTicketHolder::available() const {
+int PriorityTicketHolder::available() const {
return _ticketsAvailable.load();
}
-int SchedulingTicketHolder::queued() const {
+int PriorityTicketHolder::queued() const {
return _enqueuedElements.loadRelaxed();
}
-void SchedulingTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
+void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
// Tickets acquired with priority kImmediate are not generated from the pool of available
// tickets, and thus should never be returned to the pool of available tickets.
invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate);
@@ -533,7 +540,7 @@ void SchedulingTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx)
_dequeueWaitingThread();
}
-bool SchedulingTicketHolder::_tryAcquireTicket() {
+bool PriorityTicketHolder::_tryAcquireTicket() {
auto remaining = _ticketsAvailable.subtractAndFetch(1);
if (remaining < 0) {
_ticketsAvailable.addAndFetch(1);
@@ -542,7 +549,7 @@ bool SchedulingTicketHolder::_tryAcquireTicket() {
return true;
}
-boost::optional<Ticket> SchedulingTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
+boost::optional<Ticket> PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
invariant(admCtx);
auto hasAcquired = _tryAcquireTicket();
@@ -552,10 +559,10 @@ boost::optional<Ticket> SchedulingTicketHolder::_tryAcquireImpl(AdmissionContext
return boost::none;
}
-boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until,
- WaitMode waitMode) {
+boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
+ AdmissionContext* admCtx,
+ Date_t until,
+ WaitMode waitMode) {
invariant(admCtx);
auto& queue = _getQueueToUse(admCtx);
@@ -574,7 +581,7 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio
}
}
-void SchedulingTicketHolder::_resize(int newSize, int oldSize) noexcept {
+void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept {
auto difference = newSize - oldSize;
_ticketsAvailable.fetchAndAdd(difference);
@@ -592,7 +599,7 @@ void SchedulingTicketHolder::_resize(int newSize, int oldSize) noexcept {
// have to wait until the current ticket holders release their tickets.
}
-bool SchedulingTicketHolder::Queue::attemptToDequeue() {
+bool PriorityTicketHolder::Queue::attemptToDequeue() {
auto threadsToBeWoken = _threadsToBeWoken.load();
while (threadsToBeWoken < _queuedThreads) {
auto canDequeue = _threadsToBeWoken.compareAndSwap(&threadsToBeWoken, threadsToBeWoken + 1);
@@ -604,7 +611,7 @@ bool SchedulingTicketHolder::Queue::attemptToDequeue() {
return false;
}
-void SchedulingTicketHolder::Queue::_signalThreadWoken() {
+void PriorityTicketHolder::Queue::_signalThreadWoken() {
auto currentThreadsToBeWoken = _threadsToBeWoken.load();
while (currentThreadsToBeWoken > 0) {
if (_threadsToBeWoken.compareAndSwap(&currentThreadsToBeWoken,
@@ -614,10 +621,10 @@ void SchedulingTicketHolder::Queue::_signalThreadWoken() {
}
}
-bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx,
- EnqueuerLockGuard& queueLock,
- const Date_t& until,
- WaitMode waitMode) {
+bool PriorityTicketHolder::Queue::enqueue(OperationContext* opCtx,
+ EnqueuerLockGuard& queueLock,
+ const Date_t& until,
+ WaitMode waitMode) {
_queuedThreads++;
// Before exiting we remove ourselves from the count of queued threads, we are still holding the
// lock here so this is safe.
@@ -653,9 +660,6 @@ bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx,
return true;
}
-PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext)
- : SchedulingTicketHolder(numTickets, 3, serviceContext) {}
-
void PriorityTicketHolder::_dequeueWaitingThread() {
// There should never be anything to dequeue from 'QueueType::ImmediatePriorityNoOpQueue' since
// 'kImmediate' operations should always bypass the need to queue.
@@ -673,14 +677,14 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
BSONObjBuilder bbb(b.subobjStart("lowPriority"));
auto& lowPriorityTicketStats =
_queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)].getStats();
- _appendPriorityStats(bbb, lowPriorityTicketStats);
+ appendCommonQueueImplStats(bbb, lowPriorityTicketStats);
bbb.done();
}
{
BSONObjBuilder bbb(b.subobjStart("normalPriority"));
auto& normalPriorityTicketStats =
_queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)].getStats();
- _appendPriorityStats(bbb, normalPriorityTicketStats);
+ appendCommonQueueImplStats(bbb, normalPriorityTicketStats);
bbb.done();
}
{
@@ -702,27 +706,7 @@ void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
}
}
-void PriorityTicketHolder::_appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const {
- auto removed = stats.totalRemovedQueue.loadRelaxed();
- auto added = stats.totalAddedQueue.loadRelaxed();
-
- b.append("addedToQueue", added);
- b.append("removedFromQueue", removed);
- b.append("queueLength", std::max(static_cast<int>(added - removed), 0));
-
- auto finished = stats.totalFinishedProcessing.loadRelaxed();
- auto started = stats.totalStartedProcessing.loadRelaxed();
- b.append("startedProcessing", started);
- b.append("processing", std::max(static_cast<int>(started - finished), 0));
- b.append("finishedProcessing", finished);
- b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed());
- b.append("canceled", stats.totalCanceled.loadRelaxed());
- b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed());
- b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed());
-}
-
-SchedulingTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse(
- const AdmissionContext* admCtx) noexcept {
+PriorityTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse(const AdmissionContext* admCtx) {
auto priority = admCtx->getPriority();
switch (priority) {
case AdmissionContext::Priority::kLow:
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 60f5d167769..65cfddb753a 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -324,7 +324,21 @@ private:
* Waiters will get placed in a specific internal queue according to some logic.
* Releasers will wake up a waiter from a group chosen according to some logic.
*/
-class SchedulingTicketHolder : public TicketHolderWithQueueingStats {
+class PriorityTicketHolder : public TicketHolderWithQueueingStats {
+protected:
+public:
+ explicit PriorityTicketHolder(int numTickets, ServiceContext* serviceContext);
+ ~PriorityTicketHolder() override;
+
+ int available() const override final;
+
+ int queued() const override final;
+
+ bool recordImmediateTicketStatistics() noexcept override final {
+ return true;
+ };
+
+private:
// Using a shared_mutex is fine here because usual considerations for avoiding them do not apply
// in this case:
// * Operations are short and do not block while holding the lock (i.e. they only do CPU-bound
@@ -338,10 +352,10 @@ class SchedulingTicketHolder : public TicketHolderWithQueueingStats {
using QueueMutex = std::shared_mutex; // NOLINT
using ReleaserLockGuard = std::shared_lock<QueueMutex>; // NOLINT
using EnqueuerLockGuard = std::unique_lock<QueueMutex>; // NOLINT
-protected:
+
class Queue {
public:
- Queue(SchedulingTicketHolder* holder) : _holder(holder){};
+ Queue(PriorityTicketHolder* holder) : _holder(holder){};
Queue(Queue&& other)
: _queuedThreads(other._queuedThreads),
@@ -378,28 +392,19 @@ protected:
int _queuedThreads{0};
AtomicWord<int> _threadsToBeWoken{0};
stdx::condition_variable _cv;
- SchedulingTicketHolder* _holder;
+ PriorityTicketHolder* _holder;
QueueStats _stats;
};
- std::vector<Queue> _queues;
-
-public:
- explicit SchedulingTicketHolder(int numTickets,
- unsigned int numQueues,
- ServiceContext* serviceContext);
- ~SchedulingTicketHolder() override;
-
- int available() const override final;
-
- int queued() const override final;
-
- bool recordImmediateTicketStatistics() noexcept override final {
- return true;
+ enum class QueueType : unsigned int {
+ LowPriorityQueue = 0,
+ NormalPriorityQueue = 1,
+ // Exclusively used for statistics tracking. This queue should never have any processes
+ // 'queued'.
+ ImmediatePriorityNoOpQueue = 2,
+ QueueTypeSize = 3
};
-private:
- bool _tryAcquireTicket();
boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
@@ -412,7 +417,11 @@ private:
void _resize(int newSize, int oldSize) noexcept override final;
- QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override = 0;
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final;
+
+ void _appendImplStats(BSONObjBuilder& b) const override final;
+
+ bool _tryAcquireTicket();
/**
* Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket.
@@ -426,14 +435,14 @@ private:
* - The number of items in each queue will not change during the execution
* - No other thread will proceed to wait during the execution of the method
*/
- virtual void _dequeueWaitingThread() = 0;
-
- void _appendImplStats(BSONObjBuilder& b) const override = 0;
+ void _dequeueWaitingThread();
/**
* Selects the queue to use for the current thread given the provided arguments.
*/
- virtual Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept = 0;
+ Queue& _getQueueToUse(const AdmissionContext* admCtx);
+
+ std::vector<Queue> _queues;
QueueMutex _queueMutex;
AtomicWord<int> _ticketsAvailable;
@@ -441,29 +450,6 @@ private:
ServiceContext* _serviceContext;
};
-class PriorityTicketHolder final : public SchedulingTicketHolder {
-public:
- explicit PriorityTicketHolder(int numTickets, ServiceContext* serviceContext);
-
-private:
- enum class QueueType : unsigned int {
- LowPriorityQueue = 0,
- NormalPriorityQueue = 1,
- // Exclusively used for statistics tracking. This queue should never have any processes
- // 'queued'.
- ImmediatePriorityNoOpQueue = 2,
- QueueTypeSize = 3
- };
-
- void _dequeueWaitingThread() override final;
- QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final;
-
- void _appendImplStats(BSONObjBuilder& b) const override final;
- void _appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const;
-
- Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept override final;
-};
-
/**
* RAII-style movable token that gets generated when a ticket is acquired and is automatically
* released when going out of scope.
@@ -473,7 +459,7 @@ class Ticket {
friend class ReaderWriterTicketHolder;
friend class TicketHolderWithQueueingStats;
friend class SemaphoreTicketHolder;
- friend class SchedulingTicketHolder;
+ friend class PriorityTicketHolder;
public:
Ticket(Ticket&& t) : _ticketholder(t._ticketholder), _admissionContext(t._admissionContext) {