summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-11-14 13:11:51 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-14 13:42:54 +0000
commit868c7cb0a72c4f3e69d8ec2466000a6e6db5322b (patch)
treed1d4bc583450b6dcc98b2f88bd202c5d95519ea0
parent2c485054578b7172439da7103b4d9a35881e087d (diff)
downloadmongo-868c7cb0a72c4f3e69d8ec2466000a6e6db5322b.tar.gz
SERVER-71251 Move PriorityTicketHolder and SemaphoreTicketHolder into separate files
-rw-r--r--src/mongo/db/concurrency/d_concurrency_test.cpp1
-rw-r--r--src/mongo/db/storage/storage_engine_init.cpp2
-rw-r--r--src/mongo/db/storage/ticketholder_manager.cpp2
-rw-r--r--src/mongo/dbtests/threadedtests.cpp1
-rw-r--r--src/mongo/util/concurrency/SConscript5
-rw-r--r--src/mongo/util/concurrency/priority_ticketholder.cpp241
-rw-r--r--src/mongo/util/concurrency/priority_ticketholder.h189
-rw-r--r--src/mongo/util/concurrency/semaphore_ticketholder.cpp236
-rw-r--r--src/mongo/util/concurrency/semaphore_ticketholder.h100
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp424
-rw-r--r--src/mongo/util/concurrency/ticketholder.h200
-rw-r--r--src/mongo/util/concurrency/ticketholder_bm.cpp2
-rw-r--r--src/mongo/util/concurrency/ticketholder_test.cpp3
13 files changed, 800 insertions, 606 deletions
diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp
index c8785d4c822..5a922bea581 100644
--- a/src/mongo/db/concurrency/d_concurrency_test.cpp
+++ b/src/mongo/db/concurrency/d_concurrency_test.cpp
@@ -42,6 +42,7 @@
#include "mongo/stdx/future.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/semaphore_ticketholder.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/progress_meter.h"
#include "mongo/util/scopeguard.h"
diff --git a/src/mongo/db/storage/storage_engine_init.cpp b/src/mongo/db/storage/storage_engine_init.cpp
index aa0c0334a97..d93e307b8fc 100644
--- a/src/mongo/db/storage/storage_engine_init.cpp
+++ b/src/mongo/db/storage/storage_engine_init.cpp
@@ -50,6 +50,8 @@
#include "mongo/db/storage/storage_repair_observer.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/concurrency/priority_ticketholder.h"
+#include "mongo/util/concurrency/semaphore_ticketholder.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/str.h"
diff --git a/src/mongo/db/storage/ticketholder_manager.cpp b/src/mongo/db/storage/ticketholder_manager.cpp
index 6cf35ab2bd1..3457cd43655 100644
--- a/src/mongo/db/storage/ticketholder_manager.cpp
+++ b/src/mongo/db/storage/ticketholder_manager.cpp
@@ -29,6 +29,8 @@
#include "mongo/db/storage/ticketholder_manager.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/concurrency/priority_ticketholder.h"
+#include "mongo/util/concurrency/semaphore_ticketholder.h"
#include "mongo/util/concurrency/ticketholder.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp
index a8132efb631..942a1f7b262 100644
--- a/src/mongo/dbtests/threadedtests.cpp
+++ b/src/mongo/dbtests/threadedtests.cpp
@@ -41,6 +41,7 @@
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/bits.h"
#include "mongo/stdx/thread.h"
+#include "mongo/util/concurrency/semaphore_ticketholder.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/timer.h"
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index f8a514014b1..de664d85453 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -24,7 +24,10 @@ env.Library(
env.Library(
target='ticketholder',
- source=['ticketholder.cpp', 'ticket_queues.cpp'],
+ source=[
+ 'priority_ticketholder.cpp', 'semaphore_ticketholder.cpp', 'ticketholder.cpp',
+ 'ticket_queues.cpp'
+ ],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/service_context',
diff --git a/src/mongo/util/concurrency/priority_ticketholder.cpp b/src/mongo/util/concurrency/priority_ticketholder.cpp
new file mode 100644
index 00000000000..ab234ee3c8f
--- /dev/null
+++ b/src/mongo/util/concurrency/priority_ticketholder.cpp
@@ -0,0 +1,241 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/service_context.h"
+#include "mongo/util/concurrency/admission_context.h"
+#include "mongo/util/concurrency/priority_ticketholder.h"
+
+#include <iostream>
+
+#include "mongo/logv2/log.h"
+#include "mongo/util/str.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
+namespace mongo {
+PriorityTicketHolder::PriorityTicketHolder(int numTickets,
+ int lowPriorityBypassThreshold,
+ ServiceContext* serviceContext)
+ : TicketHolderWithQueueingStats(numTickets, serviceContext),
+ _lowPriorityBypassThreshold(lowPriorityBypassThreshold),
+ _serviceContext(serviceContext) {
+ _ticketsAvailable.store(numTickets);
+ _enqueuedElements.store(0);
+}
+
+void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold(
+ const int& newBypassThreshold) {
+ ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex);
+ _lowPriorityBypassThreshold = newBypassThreshold;
+}
+
+boost::optional<Ticket> PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
+ invariant(admCtx);
+ // Low priority operations cannot use optimistic ticket acquisition and will go to the queue
+ // instead. This is done to prevent them from skipping the line before other high-priority
+ // operations.
+ if (admCtx->getPriority() >= AdmissionContext::Priority::kNormal) {
+ auto hasAcquired = _tryAcquireTicket();
+ if (hasAcquired) {
+ return Ticket{this, admCtx};
+ }
+ }
+ return boost::none;
+}
+
+boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
+ AdmissionContext* admCtx,
+ Date_t until,
+ WaitMode waitMode) {
+ invariant(admCtx);
+
+ auto queueType = _getQueueType(admCtx);
+ auto& queue = _getQueue(queueType);
+
+ bool interruptible = waitMode == WaitMode::kInterruptible;
+
+ _enqueuedElements.addAndFetch(1);
+ ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); });
+
+ ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex);
+ do {
+ while (_ticketsAvailable.load() <= 0 ||
+ _hasToWaitForHigherPriority(uniqueQueueLock, queueType)) {
+ bool hasTimedOut = !queue.enqueue(uniqueQueueLock, opCtx, until, interruptible);
+ if (hasTimedOut) {
+ return boost::none;
+ }
+ }
+ } while (!_tryAcquireTicket());
+ return Ticket{this, admCtx};
+}
+
+void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
+ // Tickets acquired with priority kImmediate are not generated from the pool of available
+ // tickets, and thus should never be returned to the pool of available tickets.
+ invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate);
+
+ // The idea behind the release mechanism consists of a consistent view of queued elements
+ // waiting for a ticket and many threads releasing tickets simultaneously. The releasers will
+ // proceed to attempt to dequeue an element by seeing if there are threads not woken and waking
+ // one, having increased the number of woken threads for accuracy. Once the thread gets woken it
+ // will then decrease the number of woken threads (as it has been woken) and then attempt to
+ // acquire a ticket. The two possible states are either one or more releasers releasing or a
+ // thread waking up due to the RW mutex.
+ //
+ // Under this lock the queues cannot be modified in terms of someone attempting to enqueue on
+ // them, only waking threads is allowed.
+ ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex);
+ _ticketsAvailable.addAndFetch(1);
+ _dequeueWaitingThread(sharedQueueLock);
+}
+
+void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept {
+ auto difference = newSize - oldSize;
+
+ _ticketsAvailable.fetchAndAdd(difference);
+
+ if (difference > 0) {
+ // As we're adding tickets the waiting threads need to be notified that there are new
+ // tickets available.
+ ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex);
+ for (int i = 0; i < difference; i++) {
+ _dequeueWaitingThread(sharedQueueLock);
+ }
+ }
+
+ // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll
+ // have to wait until the current ticket holders release their tickets.
+}
+
+TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse(
+ const AdmissionContext* admCtx) noexcept {
+ auto queueType = _getQueueType(admCtx);
+ return _stats[_enumToInt(queueType)];
+}
+
+void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
+ {
+ BSONObjBuilder bbb(b.subobjStart("lowPriority"));
+ const auto& lowPriorityTicketStats = _stats[_enumToInt(QueueType::kLowPriority)];
+ _appendCommonQueueImplStats(bbb, lowPriorityTicketStats);
+ bbb.append("expedited", expedited());
+ bbb.append("bypassed", bypassed());
+ bbb.done();
+ }
+ {
+ BSONObjBuilder bbb(b.subobjStart("normalPriority"));
+ const auto& normalPriorityTicketStats = _stats[_enumToInt(QueueType::kNormalPriority)];
+ _appendCommonQueueImplStats(bbb, normalPriorityTicketStats);
+ bbb.done();
+ }
+ {
+ BSONObjBuilder bbb(b.subobjStart("immediatePriority"));
+ // Since 'kImmediate' priority operations will never queue, omit queueing statistics that
+ // will always be 0.
+ const auto& immediateTicketStats = _stats[_enumToInt(QueueType::kImmediatePriority)];
+
+ auto finished = immediateTicketStats.totalFinishedProcessing.loadRelaxed();
+ auto started = immediateTicketStats.totalStartedProcessing.loadRelaxed();
+ bbb.append("startedProcessing", started);
+ bbb.append("processing", std::max(static_cast<int>(started - finished), 0));
+ bbb.append("finishedProcessing", finished);
+ bbb.append("totalTimeProcessingMicros",
+ immediateTicketStats.totalTimeProcessingMicros.loadRelaxed());
+ bbb.append("newAdmissions", immediateTicketStats.totalNewAdmissions.loadRelaxed());
+ bbb.done();
+ }
+}
+
+bool PriorityTicketHolder::_tryAcquireTicket() {
+ auto remaining = _ticketsAvailable.subtractAndFetch(1);
+ if (remaining < 0) {
+ _ticketsAvailable.addAndFetch(1);
+ return false;
+ }
+ return true;
+}
+
+void PriorityTicketHolder::_dequeueWaitingThread(
+ const ticket_queues::SharedLockGuard& sharedQueueLock) {
+ // There are only 2 possible queues to dequeue from - the low priority and normal priority
+ // queues. There will never be anything to dequeue from the immediate priority queue, given
+ // immediate priority operations will never wait for ticket admission.
+ auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority);
+ auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority);
+
+ // There is a guarantee that the number of queued elements cannot change while holding the
+ // shared queue lock.
+ auto lowQueueCount = lowPriorityQueue.queuedElems();
+ auto normalQueueCount = normalPriorityQueue.queuedElems();
+
+ if (lowQueueCount == 0 && normalQueueCount == 0) {
+ return;
+ }
+ if (lowQueueCount == 0) {
+ normalPriorityQueue.attemptToDequeue(sharedQueueLock);
+ return;
+ }
+ if (normalQueueCount == 0) {
+ lowPriorityQueue.attemptToDequeue(sharedQueueLock);
+ return;
+ }
+
+ // Both queues are non-empty, and the low priority queue is bypassed for dequeue in favor of the
+ // normal priority queue until the bypass threshold is met.
+ if (_lowPriorityBypassThreshold > 0 &&
+ _lowPriorityBypassCount.addAndFetch(1) % _lowPriorityBypassThreshold == 0) {
+ if (lowPriorityQueue.attemptToDequeue(sharedQueueLock)) {
+ _expeditedLowPriorityAdmissions.addAndFetch(1);
+ } else {
+ normalPriorityQueue.attemptToDequeue(sharedQueueLock);
+ }
+ return;
+ }
+
+ if (!normalPriorityQueue.attemptToDequeue(sharedQueueLock)) {
+ lowPriorityQueue.attemptToDequeue(sharedQueueLock);
+ }
+}
+
+bool PriorityTicketHolder::_hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk,
+ QueueType queue) {
+ switch (queue) {
+ case QueueType::kLowPriority: {
+ const auto& normalQueue = _getQueue(QueueType::kNormalPriority);
+ auto pending = normalQueue.getThreadsPendingToWake();
+ return pending != 0 && pending >= _ticketsAvailable.load();
+ }
+ default:
+ return false;
+ }
+}
+} // namespace mongo
diff --git a/src/mongo/util/concurrency/priority_ticketholder.h b/src/mongo/util/concurrency/priority_ticketholder.h
new file mode 100644
index 00000000000..274fcb661c4
--- /dev/null
+++ b/src/mongo/util/concurrency/priority_ticketholder.h
@@ -0,0 +1,189 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include <queue>
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/concurrency/admission_context.h"
+#include "mongo/util/concurrency/mutex.h"
+#include "mongo/util/concurrency/ticket_queues.h"
+#include "mongo/util/concurrency/ticketholder.h"
+#include "mongo/util/hierarchical_acquisition.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class Ticket;
+
+/**
+ * A ticketholder implementation that centralises all ticket acquisition/releases. Waiters will get
+ * placed in a specific internal queue according to some logic. Releasers will wake up a waiter
+ * from a group chosen according to some logic.
+ */
+class PriorityTicketHolder : public TicketHolderWithQueueingStats {
+public:
+ explicit PriorityTicketHolder(int numTickets,
+ int lowPriorityBypassThreshold,
+ ServiceContext* serviceContext);
+ ~PriorityTicketHolder() override{};
+
+ int available() const override final {
+ return _ticketsAvailable.load();
+ };
+
+ int queued() const override final {
+ return _enqueuedElements.loadRelaxed();
+ }
+
+ bool recordImmediateTicketStatistics() noexcept override final {
+ return true;
+ };
+
+ /**
+ * Number of times low priority operations are expedited for ticket admission over normal
+ * priority operations.
+ */
+ std::int64_t expedited() const {
+ return _expeditedLowPriorityAdmissions.loadRelaxed();
+ }
+
+ /**
+ * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the
+ * normal priority queue when a ticket becomes available.
+ */
+ std::int64_t bypassed() const {
+ return _lowPriorityBypassCount.loadRelaxed();
+ };
+
+ void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold);
+
+private:
+ enum class QueueType : unsigned int {
+ kLowPriority = 0,
+ kNormalPriority = 1,
+ // Exclusively used for statistics tracking. This queue should never have any processes
+ // 'queued'.
+ kImmediatePriority = 2,
+ QueueTypeSize = 3
+ };
+
+ boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
+
+ boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
+ AdmissionContext* admCtx,
+ Date_t until,
+ WaitMode waitMode) override final;
+
+ void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final;
+
+ void _resize(int newSize, int oldSize) noexcept override final;
+
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final;
+
+ void _appendImplStats(BSONObjBuilder& b) const override final;
+
+ bool _tryAcquireTicket();
+
+ /**
+ * Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket.
+ * Implementors MUST wake at least one waiting thread if at least one thread is pending to be
+ * woken between all the queues. In other words, attemptToDequeue on each non-empty Queue must
+ * be called until either it returns true at least once or has been called on all queues.
+ *
+ * Care must be taken to ensure that only CPU-bound work is performed here and it doesn't block.
+ *
+ * When called the following invariants will be held:
+ * - The number of items in each queue will not change during the execution
+ * - No other thread will proceed to wait during the execution of the method
+ */
+ void _dequeueWaitingThread(const ticket_queues::SharedLockGuard& sharedQueueLock);
+
+ /**
+ * Returns whether there are higher priority threads pending to get a ticket in front of the
+ * given queue type and not enough tickets for all of them.
+ */
+ bool _hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, QueueType queueType);
+
+ unsigned int _enumToInt(QueueType queueType) {
+ return static_cast<unsigned int>(queueType);
+ }
+ unsigned int _enumToInt(QueueType queueType) const {
+ return static_cast<unsigned int>(queueType);
+ }
+
+ ticket_queues::Queue& _getQueue(QueueType queueType) {
+ return _queues[_enumToInt(queueType)];
+ }
+
+
+ QueueType _getQueueType(const AdmissionContext* admCtx) {
+ auto priority = admCtx->getPriority();
+ switch (priority) {
+ case AdmissionContext::Priority::kLow:
+ return QueueType::kLowPriority;
+ case AdmissionContext::Priority::kNormal:
+ return QueueType::kNormalPriority;
+ case AdmissionContext::Priority::kImmediate:
+ return QueueType::kImmediatePriority;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
+
+ ticket_queues::QueueMutex _queueMutex;
+ std::array<ticket_queues::Queue, static_cast<unsigned int>(QueueType::QueueTypeSize)> _queues;
+ std::array<QueueStats, static_cast<unsigned int>(QueueType::QueueTypeSize)> _stats;
+
+ /**
+ * Limits the number times the low priority queue is non-empty and bypassed in favor of the
+ * normal priority queue for the next ticket admission.
+ *
+ * Updates must be done under the ticket_queues::UniqueLockGuard.
+ */
+ int _lowPriorityBypassThreshold;
+
+ /**
+ * Counts the number of times normal operations are dequeued over operations queued in the low
+ * priority queue.
+ */
+ AtomicWord<std::uint64_t> _lowPriorityBypassCount{0};
+
+ /**
+ * Number of times ticket admission is expedited for low priority operations.
+ */
+ AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0};
+ AtomicWord<int> _ticketsAvailable;
+ AtomicWord<int> _enqueuedElements;
+ ServiceContext* _serviceContext;
+};
+} // namespace mongo
diff --git a/src/mongo/util/concurrency/semaphore_ticketholder.cpp b/src/mongo/util/concurrency/semaphore_ticketholder.cpp
new file mode 100644
index 00000000000..56e6d84c29f
--- /dev/null
+++ b/src/mongo/util/concurrency/semaphore_ticketholder.cpp
@@ -0,0 +1,236 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/service_context.h"
+#include "mongo/util/concurrency/admission_context.h"
+#include "mongo/util/concurrency/semaphore_ticketholder.h"
+#include "mongo/util/concurrency/ticketholder.h"
+
+#include <iostream>
+
+#include "mongo/logv2/log.h"
+#include "mongo/util/str.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
+namespace mongo {
+
+void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
+ _appendCommonQueueImplStats(b, _semaphoreStats);
+}
+#if defined(__linux__)
+namespace {
+
+/**
+ * Accepts an errno code, prints its error message, and exits.
+ */
+void failWithErrno(int err) {
+ LOGV2_FATAL(28604,
+ "error in Ticketholder: {errnoWithDescription_err}",
+ "errnoWithDescription_err"_attr = errorMessage(posixError(err)));
+}
+
+/*
+ * Checks the return value from a Linux semaphore function call, and fails with the set errno if the
+ * call was unsucessful.
+ */
+void check(int ret) {
+ if (ret == 0)
+ return;
+ failWithErrno(errno);
+}
+
+/**
+ * Takes a Date_t deadline and sets the appropriate values in a timespec structure.
+ */
+void tsFromDate(const Date_t& deadline, struct timespec& ts) {
+ ts.tv_sec = deadline.toTimeT();
+ ts.tv_nsec = (deadline.toMillisSinceEpoch() % 1000) * 1'000'000;
+}
+} // namespace
+
+SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext)
+ : TicketHolderWithQueueingStats(numTickets, serviceContext) {
+ check(sem_init(&_sem, 0, numTickets));
+}
+
+SemaphoreTicketHolder::~SemaphoreTicketHolder() {
+ check(sem_destroy(&_sem));
+}
+
+boost::optional<Ticket> SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
+ while (0 != sem_trywait(&_sem)) {
+ if (errno == EAGAIN)
+ return boost::none;
+ if (errno != EINTR)
+ failWithErrno(errno);
+ }
+ return Ticket{this, admCtx};
+}
+
+boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
+ AdmissionContext* admCtx,
+ Date_t until,
+ WaitMode waitMode) {
+ const Milliseconds intervalMs(500);
+ struct timespec ts;
+
+ // To support interrupting ticket acquisition while still benefiting from semaphores, we do a
+ // timed wait on an interval to periodically check for interrupts.
+ // The wait period interval is the smaller of the default interval and the provided
+ // deadline.
+ Date_t deadline = std::min(until, Date_t::now() + intervalMs);
+ tsFromDate(deadline, ts);
+
+ while (0 != sem_timedwait(&_sem, &ts)) {
+ if (errno == ETIMEDOUT) {
+ // If we reached the deadline without being interrupted, we have completely timed out.
+ if (deadline == until)
+ return boost::none;
+
+ deadline = std::min(until, Date_t::now() + intervalMs);
+ tsFromDate(deadline, ts);
+ } else if (errno != EINTR) {
+ failWithErrno(errno);
+ }
+
+ // To correctly handle errors from sem_timedwait, we should check for interrupts last.
+ // It is possible to unset 'errno' after a call to checkForInterrupt().
+ if (waitMode == WaitMode::kInterruptible)
+ opCtx->checkForInterrupt();
+ }
+ return Ticket{this, admCtx};
+}
+
+void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
+ check(sem_post(&_sem));
+}
+
+int SemaphoreTicketHolder::available() const {
+ int val = 0;
+ check(sem_getvalue(&_sem, &val));
+ return val;
+}
+
+void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept {
+ auto difference = newSize - oldSize;
+
+ if (difference > 0) {
+ for (int i = 0; i < difference; i++) {
+ check(sem_post(&_sem));
+ }
+ } else if (difference < 0) {
+ for (int i = 0; i < -difference; i++) {
+ check(sem_wait(&_sem));
+ }
+ }
+}
+
+#else
+
+SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* svcCtx)
+ : TicketHolderWithQueueingStats(numTickets, svcCtx), _numTickets(numTickets) {}
+
+SemaphoreTicketHolder::~SemaphoreTicketHolder() = default;
+
+boost::optional<Ticket> SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (!_tryAcquire()) {
+ return boost::none;
+ }
+ return Ticket{this, admCtx};
+}
+
+boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
+ AdmissionContext* admCtx,
+ Date_t until,
+ WaitMode waitMode) {
+ stdx::unique_lock<Latch> lk(_mutex);
+
+ bool taken = [&] {
+ if (waitMode == WaitMode::kInterruptible) {
+ return opCtx->waitForConditionOrInterruptUntil(
+ _newTicket, lk, until, [this] { return _tryAcquire(); });
+ } else {
+ if (until == Date_t::max()) {
+ _newTicket.wait(lk, [this] { return _tryAcquire(); });
+ return true;
+ } else {
+ return _newTicket.wait_until(
+ lk, until.toSystemTimePoint(), [this] { return _tryAcquire(); });
+ }
+ }
+ }();
+ if (!taken) {
+ return boost::none;
+ }
+ return Ticket{this, admCtx};
+}
+
+void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _numTickets++;
+ }
+ _newTicket.notify_one();
+}
+
+int SemaphoreTicketHolder::available() const {
+ return _numTickets;
+}
+
+bool SemaphoreTicketHolder::_tryAcquire() {
+ if (_numTickets <= 0) {
+ if (_numTickets < 0) {
+ std::cerr << "DISASTER! in TicketHolder" << std::endl;
+ }
+ return false;
+ }
+ _numTickets--;
+ return true;
+}
+
+void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept {
+ auto difference = newSize - oldSize;
+
+ stdx::lock_guard<Latch> lk(_mutex);
+ _numTickets += difference;
+
+ if (difference > 0) {
+ for (int i = 0; i < difference; i++) {
+ _newTicket.notify_one();
+ }
+ }
+ // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll
+ // have to wait until the current ticket holders release their tickets.
+}
+#endif
+} // namespace mongo
diff --git a/src/mongo/util/concurrency/semaphore_ticketholder.h b/src/mongo/util/concurrency/semaphore_ticketholder.h
new file mode 100644
index 00000000000..56606c3da2e
--- /dev/null
+++ b/src/mongo/util/concurrency/semaphore_ticketholder.h
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#if defined(__linux__)
+#include <semaphore.h>
+#endif
+
+#include <queue>
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/concurrency/admission_context.h"
+#include "mongo/util/concurrency/mutex.h"
+#include "mongo/util/concurrency/ticket_queues.h"
+#include "mongo/util/concurrency/ticketholder.h"
+#include "mongo/util/hierarchical_acquisition.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class SemaphoreTicketHolder final : public TicketHolderWithQueueingStats {
+public:
+ explicit SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext);
+ ~SemaphoreTicketHolder() override final;
+
+ int available() const override final;
+
+ int queued() const override final {
+ auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed();
+ auto added = _semaphoreStats.totalAddedQueue.loadRelaxed();
+ return std::max(static_cast<int>(added - removed), 0);
+ };
+
+ bool recordImmediateTicketStatistics() noexcept override final {
+ // Historically, operations that now acquire 'immediate' tickets bypassed the ticketing
+ // mechanism completely. Preserve legacy behavior where 'immediate' ticketing is not tracked
+ // in the statistics.
+ return false;
+ }
+
+private:
+ boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
+ AdmissionContext* admCtx,
+ Date_t until,
+ WaitMode waitMode) override final;
+
+ boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
+ void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final;
+
+ void _appendImplStats(BSONObjBuilder& b) const override final;
+
+ void _resize(int newSize, int oldSize) noexcept override final;
+
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final {
+ return _semaphoreStats;
+ }
+#if defined(__linux__)
+ mutable sem_t _sem;
+
+#else
+ bool _tryAcquire();
+
+ int _numTickets;
+ Mutex _mutex =
+ MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex");
+ stdx::condition_variable _newTicket;
+#endif
+ QueueStats _semaphoreStats;
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index eafed4da466..33e8c05b2c5 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -63,29 +63,6 @@ void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext,
admCtx->start(serviceContext->getTickSource());
queueStats.totalStartedProcessing.fetchAndAddRelaxed(1);
}
-
-/**
- * Appends the standard statistics stored in QueueStats to BSONObjBuilder b;
- */
-void appendCommonQueueImplStats(BSONObjBuilder& b,
- const TicketHolderWithQueueingStats::QueueStats& stats) {
- auto removed = stats.totalRemovedQueue.loadRelaxed();
- auto added = stats.totalAddedQueue.loadRelaxed();
-
- b.append("addedToQueue", added);
- b.append("removedFromQueue", removed);
- b.append("queueLength", std::max(static_cast<int>(added - removed), 0));
-
- auto finished = stats.totalFinishedProcessing.loadRelaxed();
- auto started = stats.totalStartedProcessing.loadRelaxed();
- b.append("startedProcessing", started);
- b.append("processing", std::max(static_cast<int>(started - finished), 0));
- b.append("finishedProcessing", finished);
- b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed());
- b.append("canceled", stats.totalCanceled.loadRelaxed());
- b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed());
- b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed());
-}
} // namespace
Ticket TicketHolderWithQueueingStats::acquireImmediateTicket(AdmissionContext* admCtx) {
@@ -187,393 +164,24 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat
}
}
-void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
- appendCommonQueueImplStats(b, _semaphoreStats);
-}
-#if defined(__linux__)
-namespace {
-
-/**
- * Accepts an errno code, prints its error message, and exits.
- */
-void failWithErrno(int err) {
- LOGV2_FATAL(28604,
- "error in Ticketholder: {errnoWithDescription_err}",
- "errnoWithDescription_err"_attr = errorMessage(posixError(err)));
-}
-
-/*
- * Checks the return value from a Linux semaphore function call, and fails with the set errno if the
- * call was unsucessful.
- */
-void check(int ret) {
- if (ret == 0)
- return;
- failWithErrno(errno);
-}
-
-/**
- * Takes a Date_t deadline and sets the appropriate values in a timespec structure.
- */
-void tsFromDate(const Date_t& deadline, struct timespec& ts) {
- ts.tv_sec = deadline.toTimeT();
- ts.tv_nsec = (deadline.toMillisSinceEpoch() % 1000) * 1'000'000;
-}
-} // namespace
-
-SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext)
- : TicketHolderWithQueueingStats(numTickets, serviceContext) {
- check(sem_init(&_sem, 0, numTickets));
-}
-
-SemaphoreTicketHolder::~SemaphoreTicketHolder() {
- check(sem_destroy(&_sem));
-}
-
-boost::optional<Ticket> SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
- while (0 != sem_trywait(&_sem)) {
- if (errno == EAGAIN)
- return boost::none;
- if (errno != EINTR)
- failWithErrno(errno);
- }
- return Ticket{this, admCtx};
-}
-
-boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until,
- WaitMode waitMode) {
- const Milliseconds intervalMs(500);
- struct timespec ts;
-
- // To support interrupting ticket acquisition while still benefiting from semaphores, we do a
- // timed wait on an interval to periodically check for interrupts.
- // The wait period interval is the smaller of the default interval and the provided
- // deadline.
- Date_t deadline = std::min(until, Date_t::now() + intervalMs);
- tsFromDate(deadline, ts);
-
- while (0 != sem_timedwait(&_sem, &ts)) {
- if (errno == ETIMEDOUT) {
- // If we reached the deadline without being interrupted, we have completely timed out.
- if (deadline == until)
- return boost::none;
-
- deadline = std::min(until, Date_t::now() + intervalMs);
- tsFromDate(deadline, ts);
- } else if (errno != EINTR) {
- failWithErrno(errno);
- }
-
- // To correctly handle errors from sem_timedwait, we should check for interrupts last.
- // It is possible to unset 'errno' after a call to checkForInterrupt().
- if (waitMode == WaitMode::kInterruptible)
- opCtx->checkForInterrupt();
- }
- return Ticket{this, admCtx};
-}
-
-void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
- check(sem_post(&_sem));
-}
-
-int SemaphoreTicketHolder::available() const {
- int val = 0;
- check(sem_getvalue(&_sem, &val));
- return val;
-}
-
-void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept {
- auto difference = newSize - oldSize;
-
- if (difference > 0) {
- for (int i = 0; i < difference; i++) {
- check(sem_post(&_sem));
- }
- } else if (difference < 0) {
- for (int i = 0; i < -difference; i++) {
- check(sem_wait(&_sem));
- }
- }
-}
-
-#else
-
-SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* svcCtx)
- : TicketHolderWithQueueingStats(numTickets, svcCtx), _numTickets(numTickets) {}
-
-SemaphoreTicketHolder::~SemaphoreTicketHolder() = default;
-
-boost::optional<Ticket> SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
- stdx::lock_guard<Latch> lk(_mutex);
- if (!_tryAcquire()) {
- return boost::none;
- }
- return Ticket{this, admCtx};
-}
-
-boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until,
- WaitMode waitMode) {
- stdx::unique_lock<Latch> lk(_mutex);
-
- bool taken = [&] {
- if (waitMode == WaitMode::kInterruptible) {
- return opCtx->waitForConditionOrInterruptUntil(
- _newTicket, lk, until, [this] { return _tryAcquire(); });
- } else {
- if (until == Date_t::max()) {
- _newTicket.wait(lk, [this] { return _tryAcquire(); });
- return true;
- } else {
- return _newTicket.wait_until(
- lk, until.toSystemTimePoint(), [this] { return _tryAcquire(); });
- }
- }
- }();
- if (!taken) {
- return boost::none;
- }
- return Ticket{this, admCtx};
-}
-
-void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
- {
- stdx::lock_guard<Latch> lk(_mutex);
- _numTickets++;
- }
- _newTicket.notify_one();
-}
-
-int SemaphoreTicketHolder::available() const {
- return _numTickets;
-}
-
-bool SemaphoreTicketHolder::_tryAcquire() {
- if (_numTickets <= 0) {
- if (_numTickets < 0) {
- std::cerr << "DISASTER! in TicketHolder" << std::endl;
- }
- return false;
- }
- _numTickets--;
- return true;
-}
-
-void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept {
- auto difference = newSize - oldSize;
-
- stdx::lock_guard<Latch> lk(_mutex);
- _numTickets += difference;
-
- if (difference > 0) {
- for (int i = 0; i < difference; i++) {
- _newTicket.notify_one();
- }
- }
- // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll
- // have to wait until the current ticket holders release their tickets.
-}
-#endif
-
-PriorityTicketHolder::PriorityTicketHolder(int numTickets,
- int lowPriorityBypassThreshold,
- ServiceContext* serviceContext)
- : TicketHolderWithQueueingStats(numTickets, serviceContext),
- _lowPriorityBypassThreshold(lowPriorityBypassThreshold),
- _serviceContext(serviceContext) {
- _ticketsAvailable.store(numTickets);
- _enqueuedElements.store(0);
-}
-
-void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold(
- const int& newBypassThreshold) {
- ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex);
- _lowPriorityBypassThreshold = newBypassThreshold;
-}
-
-boost::optional<Ticket> PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) {
- invariant(admCtx);
- // Low priority operations cannot use optimistic ticket acquisition and will go to the queue
- // instead. This is done to prevent them from skipping the line before other high-priority
- // operations.
- if (admCtx->getPriority() >= AdmissionContext::Priority::kNormal) {
- auto hasAcquired = _tryAcquireTicket();
- if (hasAcquired) {
- return Ticket{this, admCtx};
- }
- }
- return boost::none;
-}
-
-boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until,
- WaitMode waitMode) {
- invariant(admCtx);
-
- auto queueType = _getQueueType(admCtx);
- auto& queue = _getQueue(queueType);
-
- bool interruptible = waitMode == WaitMode::kInterruptible;
-
- _enqueuedElements.addAndFetch(1);
- ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); });
-
- ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex);
- do {
- while (_ticketsAvailable.load() <= 0 ||
- _hasToWaitForHigherPriority(uniqueQueueLock, queueType)) {
- bool hasTimedOut = !queue.enqueue(uniqueQueueLock, opCtx, until, interruptible);
- if (hasTimedOut) {
- return boost::none;
- }
- }
- } while (!_tryAcquireTicket());
- return Ticket{this, admCtx};
-}
-
-void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
- // Tickets acquired with priority kImmediate are not generated from the pool of available
- // tickets, and thus should never be returned to the pool of available tickets.
- invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate);
-
- // The idea behind the release mechanism consists of a consistent view of queued elements
- // waiting for a ticket and many threads releasing tickets simultaneously. The releasers will
- // proceed to attempt to dequeue an element by seeing if there are threads not woken and waking
- // one, having increased the number of woken threads for accuracy. Once the thread gets woken it
- // will then decrease the number of woken threads (as it has been woken) and then attempt to
- // acquire a ticket. The two possible states are either one or more releasers releasing or a
- // thread waking up due to the RW mutex.
- //
- // Under this lock the queues cannot be modified in terms of someone attempting to enqueue on
- // them, only waking threads is allowed.
- ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex);
- _ticketsAvailable.addAndFetch(1);
- _dequeueWaitingThread(sharedQueueLock);
-}
-
-void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept {
- auto difference = newSize - oldSize;
-
- _ticketsAvailable.fetchAndAdd(difference);
-
- if (difference > 0) {
- // As we're adding tickets the waiting threads need to be notified that there are new
- // tickets available.
- ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex);
- for (int i = 0; i < difference; i++) {
- _dequeueWaitingThread(sharedQueueLock);
- }
- }
-
- // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll
- // have to wait until the current ticket holders release their tickets.
-}
-
-TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse(
- const AdmissionContext* admCtx) noexcept {
- auto queueType = _getQueueType(admCtx);
- return _stats[_enumToInt(queueType)];
-}
-
-void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
- {
- BSONObjBuilder bbb(b.subobjStart("lowPriority"));
- const auto& lowPriorityTicketStats = _stats[_enumToInt(QueueType::kLowPriority)];
- appendCommonQueueImplStats(bbb, lowPriorityTicketStats);
- bbb.append("expedited", expedited());
- bbb.append("bypassed", bypassed());
- bbb.done();
- }
- {
- BSONObjBuilder bbb(b.subobjStart("normalPriority"));
- const auto& normalPriorityTicketStats = _stats[_enumToInt(QueueType::kNormalPriority)];
- appendCommonQueueImplStats(bbb, normalPriorityTicketStats);
- bbb.done();
- }
- {
- BSONObjBuilder bbb(b.subobjStart("immediatePriority"));
- // Since 'kImmediate' priority operations will never queue, omit queueing statistics that
- // will always be 0.
- const auto& immediateTicketStats = _stats[_enumToInt(QueueType::kImmediatePriority)];
-
- auto finished = immediateTicketStats.totalFinishedProcessing.loadRelaxed();
- auto started = immediateTicketStats.totalStartedProcessing.loadRelaxed();
- bbb.append("startedProcessing", started);
- bbb.append("processing", std::max(static_cast<int>(started - finished), 0));
- bbb.append("finishedProcessing", finished);
- bbb.append("totalTimeProcessingMicros",
- immediateTicketStats.totalTimeProcessingMicros.loadRelaxed());
- bbb.append("newAdmissions", immediateTicketStats.totalNewAdmissions.loadRelaxed());
- bbb.done();
- }
-}
-
-bool PriorityTicketHolder::_tryAcquireTicket() {
- auto remaining = _ticketsAvailable.subtractAndFetch(1);
- if (remaining < 0) {
- _ticketsAvailable.addAndFetch(1);
- return false;
- }
- return true;
-}
-
-void PriorityTicketHolder::_dequeueWaitingThread(
- const ticket_queues::SharedLockGuard& sharedQueueLock) {
- // There are only 2 possible queues to dequeue from - the low priority and normal priority
- // queues. There will never be anything to dequeue from the immediate priority queue, given
- // immediate priority operations will never wait for ticket admission.
- auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority);
- auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority);
-
- // There is a guarantee that the number of queued elements cannot change while holding the
- // shared queue lock.
- auto lowQueueCount = lowPriorityQueue.queuedElems();
- auto normalQueueCount = normalPriorityQueue.queuedElems();
-
- if (lowQueueCount == 0 && normalQueueCount == 0) {
- return;
- }
- if (lowQueueCount == 0) {
- normalPriorityQueue.attemptToDequeue(sharedQueueLock);
- return;
- }
- if (normalQueueCount == 0) {
- lowPriorityQueue.attemptToDequeue(sharedQueueLock);
- return;
- }
-
- // Both queues are non-empty, and the low priority queue is bypassed for dequeue in favor of the
- // normal priority queue until the bypass threshold is met.
- if (_lowPriorityBypassThreshold > 0 &&
- _lowPriorityBypassCount.addAndFetch(1) % _lowPriorityBypassThreshold == 0) {
- if (lowPriorityQueue.attemptToDequeue(sharedQueueLock)) {
- _expeditedLowPriorityAdmissions.addAndFetch(1);
- } else {
- normalPriorityQueue.attemptToDequeue(sharedQueueLock);
- }
- return;
- }
+void TicketHolderWithQueueingStats::_appendCommonQueueImplStats(BSONObjBuilder& b,
+ const QueueStats& stats) const {
+ auto removed = stats.totalRemovedQueue.loadRelaxed();
+ auto added = stats.totalAddedQueue.loadRelaxed();
- if (!normalPriorityQueue.attemptToDequeue(sharedQueueLock)) {
- lowPriorityQueue.attemptToDequeue(sharedQueueLock);
- }
-}
+ b.append("addedToQueue", added);
+ b.append("removedFromQueue", removed);
+ b.append("queueLength", std::max(static_cast<int>(added - removed), 0));
-bool PriorityTicketHolder::_hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk,
- QueueType queue) {
- switch (queue) {
- case QueueType::kLowPriority: {
- const auto& normalQueue = _getQueue(QueueType::kNormalPriority);
- auto pending = normalQueue.getThreadsPendingToWake();
- return pending != 0 && pending >= _ticketsAvailable.load();
- }
- default:
- return false;
- }
+ auto finished = stats.totalFinishedProcessing.loadRelaxed();
+ auto started = stats.totalStartedProcessing.loadRelaxed();
+ b.append("startedProcessing", started);
+ b.append("processing", std::max(static_cast<int>(started - finished), 0));
+ b.append("finishedProcessing", finished);
+ b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed());
+ b.append("canceled", stats.totalCanceled.loadRelaxed());
+ b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed());
+ b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed());
}
} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index a87dacca15f..5d16ef1ae25 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -28,12 +28,6 @@
*/
#pragma once
-#if defined(__linux__)
-#include <semaphore.h>
-#endif
-
-#include <queue>
-
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/mutex.h"
@@ -41,13 +35,14 @@
#include "mongo/stdx/future.h"
#include "mongo/util/concurrency/admission_context.h"
#include "mongo/util/concurrency/mutex.h"
-#include "mongo/util/concurrency/ticket_queues.h"
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/time_support.h"
namespace mongo {
class Ticket;
+class PriorityTicketHolder;
+class SemaphoreTicketHolder;
/**
* Maintains and distributes tickets across operations from a limited pool of tickets. The ticketing
@@ -224,198 +219,11 @@ private:
AtomicWord<int> _outof;
protected:
- ServiceContext* _serviceContext;
-};
-
-class SemaphoreTicketHolder final : public TicketHolderWithQueueingStats {
-public:
- explicit SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext);
- ~SemaphoreTicketHolder() override final;
-
- int available() const override final;
-
- int queued() const override final {
- auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed();
- auto added = _semaphoreStats.totalAddedQueue.loadRelaxed();
- return std::max(static_cast<int>(added - removed), 0);
- };
-
- bool recordImmediateTicketStatistics() noexcept override final {
- // Historically, operations that now acquire 'immediate' tickets bypassed the ticketing
- // mechanism completely. Preserve legacy behavior where 'immediate' ticketing is not tracked
- // in the statistics.
- return false;
- }
-
-private:
- boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until,
- WaitMode waitMode) override final;
-
- boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
- void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final;
-
- void _appendImplStats(BSONObjBuilder& b) const override final;
-
- void _resize(int newSize, int oldSize) noexcept override final;
-
- QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final {
- return _semaphoreStats;
- }
-#if defined(__linux__)
- mutable sem_t _sem;
-
-#else
- bool _tryAcquire();
-
- int _numTickets;
- Mutex _mutex =
- MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex");
- stdx::condition_variable _newTicket;
-#endif
- QueueStats _semaphoreStats;
-};
-
-/**
- * A ticketholder implementation that centralises all ticket acquisition/releases. Waiters will get
- * placed in a specific internal queue according to some logic. Releasers will wake up a waiter
- * from a group chosen according to some logic.
- */
-class PriorityTicketHolder : public TicketHolderWithQueueingStats {
-public:
- explicit PriorityTicketHolder(int numTickets,
- int lowPriorityBypassThreshold,
- ServiceContext* serviceContext);
- ~PriorityTicketHolder() override{};
-
- int available() const override final {
- return _ticketsAvailable.load();
- };
-
- int queued() const override final {
- return _enqueuedElements.loadRelaxed();
- }
-
- bool recordImmediateTicketStatistics() noexcept override final {
- return true;
- };
-
/**
- * Number of times low priority operations are expedited for ticket admission over normal
- * priority operations.
+ * Appends the standard statistics stored in QueueStats to BSONObjBuilder b;
*/
- std::int64_t expedited() const {
- return _expeditedLowPriorityAdmissions.loadRelaxed();
- }
+ void _appendCommonQueueImplStats(BSONObjBuilder& b, const QueueStats& stats) const;
- /**
- * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the
- * normal priority queue when a ticket becomes available.
- */
- std::int64_t bypassed() const {
- return _lowPriorityBypassCount.loadRelaxed();
- };
-
- void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold);
-
-private:
- enum class QueueType : unsigned int {
- kLowPriority = 0,
- kNormalPriority = 1,
- // Exclusively used for statistics tracking. This queue should never have any processes
- // 'queued'.
- kImmediatePriority = 2,
- QueueTypeSize = 3
- };
-
- boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
-
- boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
- AdmissionContext* admCtx,
- Date_t until,
- WaitMode waitMode) override final;
-
- void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final;
-
- void _resize(int newSize, int oldSize) noexcept override final;
-
- QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final;
-
- void _appendImplStats(BSONObjBuilder& b) const override final;
-
- bool _tryAcquireTicket();
-
- /**
- * Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket.
- * Implementors MUST wake at least one waiting thread if at least one thread is pending to be
- * woken between all the queues. In other words, attemptToDequeue on each non-empty Queue must
- * be called until either it returns true at least once or has been called on all queues.
- *
- * Care must be taken to ensure that only CPU-bound work is performed here and it doesn't block.
- *
- * When called the following invariants will be held:
- * - The number of items in each queue will not change during the execution
- * - No other thread will proceed to wait during the execution of the method
- */
- void _dequeueWaitingThread(const ticket_queues::SharedLockGuard& sharedQueueLock);
-
- /**
- * Returns whether there are higher priority threads pending to get a ticket in front of the
- * given queue type and not enough tickets for all of them.
- */
- bool _hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, QueueType queueType);
-
- unsigned int _enumToInt(QueueType queueType) {
- return static_cast<unsigned int>(queueType);
- }
- unsigned int _enumToInt(QueueType queueType) const {
- return static_cast<unsigned int>(queueType);
- }
-
- ticket_queues::Queue& _getQueue(QueueType queueType) {
- return _queues[_enumToInt(queueType)];
- }
-
-
- QueueType _getQueueType(const AdmissionContext* admCtx) {
- auto priority = admCtx->getPriority();
- switch (priority) {
- case AdmissionContext::Priority::kLow:
- return QueueType::kLowPriority;
- case AdmissionContext::Priority::kNormal:
- return QueueType::kNormalPriority;
- case AdmissionContext::Priority::kImmediate:
- return QueueType::kImmediatePriority;
- default:
- MONGO_UNREACHABLE;
- }
- }
-
- ticket_queues::QueueMutex _queueMutex;
- std::array<ticket_queues::Queue, static_cast<unsigned int>(QueueType::QueueTypeSize)> _queues;
- std::array<QueueStats, static_cast<unsigned int>(QueueType::QueueTypeSize)> _stats;
-
- /**
- * Limits the number times the low priority queue is non-empty and bypassed in favor of the
- * normal priority queue for the next ticket admission.
- *
- * Updates must be done under the ticket_queues::UniqueLockGuard.
- */
- int _lowPriorityBypassThreshold;
-
- /**
- * Counts the number of times normal operations are dequeued over operations queued in the low
- * priority queue.
- */
- AtomicWord<std::uint64_t> _lowPriorityBypassCount{0};
-
- /**
- * Number of times ticket admission is expedited for low priority operations.
- */
- AtomicWord<std::int64_t> _expeditedLowPriorityAdmissions{0};
- AtomicWord<int> _ticketsAvailable;
- AtomicWord<int> _enqueuedElements;
ServiceContext* _serviceContext;
};
diff --git a/src/mongo/util/concurrency/ticketholder_bm.cpp b/src/mongo/util/concurrency/ticketholder_bm.cpp
index a3ee1a14768..d6983df1dca 100644
--- a/src/mongo/util/concurrency/ticketholder_bm.cpp
+++ b/src/mongo/util/concurrency/ticketholder_bm.cpp
@@ -35,6 +35,8 @@
#include "mongo/db/concurrency/locker_noop_client_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
+#include "mongo/util/concurrency/priority_ticketholder.h"
+#include "mongo/util/concurrency/semaphore_ticketholder.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/tick_source_mock.h"
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp
index 3c7f8b1803c..ad08fc92b18 100644
--- a/src/mongo/util/concurrency/ticketholder_test.cpp
+++ b/src/mongo/util/concurrency/ticketholder_test.cpp
@@ -40,12 +40,13 @@
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/concurrency/admission_context.h"
+#include "mongo/util/concurrency/priority_ticketholder.h"
+#include "mongo/util/concurrency/semaphore_ticketholder.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/tick_source_mock.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
namespace {
using namespace mongo;