From 868c7cb0a72c4f3e69d8ec2466000a6e6db5322b Mon Sep 17 00:00:00 2001 From: Haley Connelly Date: Mon, 14 Nov 2022 13:11:51 +0000 Subject: SERVER-71251 Move PriorityTicketHolder and SemaphoreTicketHolder into separate files --- src/mongo/db/concurrency/d_concurrency_test.cpp | 1 + src/mongo/db/storage/storage_engine_init.cpp | 2 + src/mongo/db/storage/ticketholder_manager.cpp | 2 + src/mongo/dbtests/threadedtests.cpp | 1 + src/mongo/util/concurrency/SConscript | 5 +- .../util/concurrency/priority_ticketholder.cpp | 241 ++++++++++++ src/mongo/util/concurrency/priority_ticketholder.h | 189 +++++++++ .../util/concurrency/semaphore_ticketholder.cpp | 236 ++++++++++++ .../util/concurrency/semaphore_ticketholder.h | 100 +++++ src/mongo/util/concurrency/ticketholder.cpp | 424 +-------------------- src/mongo/util/concurrency/ticketholder.h | 200 +--------- src/mongo/util/concurrency/ticketholder_bm.cpp | 2 + src/mongo/util/concurrency/ticketholder_test.cpp | 3 +- 13 files changed, 800 insertions(+), 606 deletions(-) create mode 100644 src/mongo/util/concurrency/priority_ticketholder.cpp create mode 100644 src/mongo/util/concurrency/priority_ticketholder.h create mode 100644 src/mongo/util/concurrency/semaphore_ticketholder.cpp create mode 100644 src/mongo/util/concurrency/semaphore_ticketholder.h diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp index c8785d4c822..5a922bea581 100644 --- a/src/mongo/db/concurrency/d_concurrency_test.cpp +++ b/src/mongo/db/concurrency/d_concurrency_test.cpp @@ -42,6 +42,7 @@ #include "mongo/stdx/future.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/semaphore_ticketholder.h" #include "mongo/util/debug_util.h" #include "mongo/util/progress_meter.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/db/storage/storage_engine_init.cpp b/src/mongo/db/storage/storage_engine_init.cpp index aa0c0334a97..d93e307b8fc 100644 --- a/src/mongo/db/storage/storage_engine_init.cpp +++ b/src/mongo/db/storage/storage_engine_init.cpp @@ -50,6 +50,8 @@ #include "mongo/db/storage/storage_repair_observer.h" #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" +#include "mongo/util/concurrency/priority_ticketholder.h" +#include "mongo/util/concurrency/semaphore_ticketholder.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/str.h" diff --git a/src/mongo/db/storage/ticketholder_manager.cpp b/src/mongo/db/storage/ticketholder_manager.cpp index 6cf35ab2bd1..3457cd43655 100644 --- a/src/mongo/db/storage/ticketholder_manager.cpp +++ b/src/mongo/db/storage/ticketholder_manager.cpp @@ -29,6 +29,8 @@ #include "mongo/db/storage/ticketholder_manager.h" #include "mongo/logv2/log.h" +#include "mongo/util/concurrency/priority_ticketholder.h" +#include "mongo/util/concurrency/semaphore_ticketholder.h" #include "mongo/util/concurrency/ticketholder.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp index a8132efb631..942a1f7b262 100644 --- a/src/mongo/dbtests/threadedtests.cpp +++ b/src/mongo/dbtests/threadedtests.cpp @@ -41,6 +41,7 @@ #include "mongo/platform/atomic_word.h" #include "mongo/platform/bits.h" #include "mongo/stdx/thread.h" +#include "mongo/util/concurrency/semaphore_ticketholder.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/timer.h" diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript index f8a514014b1..de664d85453 100644 --- a/src/mongo/util/concurrency/SConscript +++ b/src/mongo/util/concurrency/SConscript @@ -24,7 +24,10 @@ env.Library( env.Library( target='ticketholder', - source=['ticketholder.cpp', 'ticket_queues.cpp'], + source=[ + 'priority_ticketholder.cpp', 'semaphore_ticketholder.cpp', 'ticketholder.cpp', + 'ticket_queues.cpp' + ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/service_context', diff --git a/src/mongo/util/concurrency/priority_ticketholder.cpp b/src/mongo/util/concurrency/priority_ticketholder.cpp new file mode 100644 index 00000000000..ab234ee3c8f --- /dev/null +++ b/src/mongo/util/concurrency/priority_ticketholder.cpp @@ -0,0 +1,241 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/service_context.h" +#include "mongo/util/concurrency/admission_context.h" +#include "mongo/util/concurrency/priority_ticketholder.h" + +#include + +#include "mongo/logv2/log.h" +#include "mongo/util/str.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + +namespace mongo { +PriorityTicketHolder::PriorityTicketHolder(int numTickets, + int lowPriorityBypassThreshold, + ServiceContext* serviceContext) + : TicketHolderWithQueueingStats(numTickets, serviceContext), + _lowPriorityBypassThreshold(lowPriorityBypassThreshold), + _serviceContext(serviceContext) { + _ticketsAvailable.store(numTickets); + _enqueuedElements.store(0); +} + +void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold( + const int& newBypassThreshold) { + ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex); + _lowPriorityBypassThreshold = newBypassThreshold; +} + +boost::optional PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { + invariant(admCtx); + // Low priority operations cannot use optimistic ticket acquisition and will go to the queue + // instead. This is done to prevent them from skipping the line before other high-priority + // operations. + if (admCtx->getPriority() >= AdmissionContext::Priority::kNormal) { + auto hasAcquired = _tryAcquireTicket(); + if (hasAcquired) { + return Ticket{this, admCtx}; + } + } + return boost::none; +} + +boost::optional PriorityTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode) { + invariant(admCtx); + + auto queueType = _getQueueType(admCtx); + auto& queue = _getQueue(queueType); + + bool interruptible = waitMode == WaitMode::kInterruptible; + + _enqueuedElements.addAndFetch(1); + ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); }); + + ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex); + do { + while (_ticketsAvailable.load() <= 0 || + _hasToWaitForHigherPriority(uniqueQueueLock, queueType)) { + bool hasTimedOut = !queue.enqueue(uniqueQueueLock, opCtx, until, interruptible); + if (hasTimedOut) { + return boost::none; + } + } + } while (!_tryAcquireTicket()); + return Ticket{this, admCtx}; +} + +void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { + // Tickets acquired with priority kImmediate are not generated from the pool of available + // tickets, and thus should never be returned to the pool of available tickets. + invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); + + // The idea behind the release mechanism consists of a consistent view of queued elements + // waiting for a ticket and many threads releasing tickets simultaneously. The releasers will + // proceed to attempt to dequeue an element by seeing if there are threads not woken and waking + // one, having increased the number of woken threads for accuracy. Once the thread gets woken it + // will then decrease the number of woken threads (as it has been woken) and then attempt to + // acquire a ticket. The two possible states are either one or more releasers releasing or a + // thread waking up due to the RW mutex. + // + // Under this lock the queues cannot be modified in terms of someone attempting to enqueue on + // them, only waking threads is allowed. + ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex); + _ticketsAvailable.addAndFetch(1); + _dequeueWaitingThread(sharedQueueLock); +} + +void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept { + auto difference = newSize - oldSize; + + _ticketsAvailable.fetchAndAdd(difference); + + if (difference > 0) { + // As we're adding tickets the waiting threads need to be notified that there are new + // tickets available. + ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex); + for (int i = 0; i < difference; i++) { + _dequeueWaitingThread(sharedQueueLock); + } + } + + // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll + // have to wait until the current ticket holders release their tickets. +} + +TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse( + const AdmissionContext* admCtx) noexcept { + auto queueType = _getQueueType(admCtx); + return _stats[_enumToInt(queueType)]; +} + +void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { + { + BSONObjBuilder bbb(b.subobjStart("lowPriority")); + const auto& lowPriorityTicketStats = _stats[_enumToInt(QueueType::kLowPriority)]; + _appendCommonQueueImplStats(bbb, lowPriorityTicketStats); + bbb.append("expedited", expedited()); + bbb.append("bypassed", bypassed()); + bbb.done(); + } + { + BSONObjBuilder bbb(b.subobjStart("normalPriority")); + const auto& normalPriorityTicketStats = _stats[_enumToInt(QueueType::kNormalPriority)]; + _appendCommonQueueImplStats(bbb, normalPriorityTicketStats); + bbb.done(); + } + { + BSONObjBuilder bbb(b.subobjStart("immediatePriority")); + // Since 'kImmediate' priority operations will never queue, omit queueing statistics that + // will always be 0. + const auto& immediateTicketStats = _stats[_enumToInt(QueueType::kImmediatePriority)]; + + auto finished = immediateTicketStats.totalFinishedProcessing.loadRelaxed(); + auto started = immediateTicketStats.totalStartedProcessing.loadRelaxed(); + bbb.append("startedProcessing", started); + bbb.append("processing", std::max(static_cast(started - finished), 0)); + bbb.append("finishedProcessing", finished); + bbb.append("totalTimeProcessingMicros", + immediateTicketStats.totalTimeProcessingMicros.loadRelaxed()); + bbb.append("newAdmissions", immediateTicketStats.totalNewAdmissions.loadRelaxed()); + bbb.done(); + } +} + +bool PriorityTicketHolder::_tryAcquireTicket() { + auto remaining = _ticketsAvailable.subtractAndFetch(1); + if (remaining < 0) { + _ticketsAvailable.addAndFetch(1); + return false; + } + return true; +} + +void PriorityTicketHolder::_dequeueWaitingThread( + const ticket_queues::SharedLockGuard& sharedQueueLock) { + // There are only 2 possible queues to dequeue from - the low priority and normal priority + // queues. There will never be anything to dequeue from the immediate priority queue, given + // immediate priority operations will never wait for ticket admission. + auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority); + auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority); + + // There is a guarantee that the number of queued elements cannot change while holding the + // shared queue lock. + auto lowQueueCount = lowPriorityQueue.queuedElems(); + auto normalQueueCount = normalPriorityQueue.queuedElems(); + + if (lowQueueCount == 0 && normalQueueCount == 0) { + return; + } + if (lowQueueCount == 0) { + normalPriorityQueue.attemptToDequeue(sharedQueueLock); + return; + } + if (normalQueueCount == 0) { + lowPriorityQueue.attemptToDequeue(sharedQueueLock); + return; + } + + // Both queues are non-empty, and the low priority queue is bypassed for dequeue in favor of the + // normal priority queue until the bypass threshold is met. + if (_lowPriorityBypassThreshold > 0 && + _lowPriorityBypassCount.addAndFetch(1) % _lowPriorityBypassThreshold == 0) { + if (lowPriorityQueue.attemptToDequeue(sharedQueueLock)) { + _expeditedLowPriorityAdmissions.addAndFetch(1); + } else { + normalPriorityQueue.attemptToDequeue(sharedQueueLock); + } + return; + } + + if (!normalPriorityQueue.attemptToDequeue(sharedQueueLock)) { + lowPriorityQueue.attemptToDequeue(sharedQueueLock); + } +} + +bool PriorityTicketHolder::_hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, + QueueType queue) { + switch (queue) { + case QueueType::kLowPriority: { + const auto& normalQueue = _getQueue(QueueType::kNormalPriority); + auto pending = normalQueue.getThreadsPendingToWake(); + return pending != 0 && pending >= _ticketsAvailable.load(); + } + default: + return false; + } +} +} // namespace mongo diff --git a/src/mongo/util/concurrency/priority_ticketholder.h b/src/mongo/util/concurrency/priority_ticketholder.h new file mode 100644 index 00000000000..274fcb661c4 --- /dev/null +++ b/src/mongo/util/concurrency/priority_ticketholder.h @@ -0,0 +1,189 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include + +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/concurrency/admission_context.h" +#include "mongo/util/concurrency/mutex.h" +#include "mongo/util/concurrency/ticket_queues.h" +#include "mongo/util/concurrency/ticketholder.h" +#include "mongo/util/hierarchical_acquisition.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class Ticket; + +/** + * A ticketholder implementation that centralises all ticket acquisition/releases. Waiters will get + * placed in a specific internal queue according to some logic. Releasers will wake up a waiter + * from a group chosen according to some logic. + */ +class PriorityTicketHolder : public TicketHolderWithQueueingStats { +public: + explicit PriorityTicketHolder(int numTickets, + int lowPriorityBypassThreshold, + ServiceContext* serviceContext); + ~PriorityTicketHolder() override{}; + + int available() const override final { + return _ticketsAvailable.load(); + }; + + int queued() const override final { + return _enqueuedElements.loadRelaxed(); + } + + bool recordImmediateTicketStatistics() noexcept override final { + return true; + }; + + /** + * Number of times low priority operations are expedited for ticket admission over normal + * priority operations. + */ + std::int64_t expedited() const { + return _expeditedLowPriorityAdmissions.loadRelaxed(); + } + + /** + * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the + * normal priority queue when a ticket becomes available. + */ + std::int64_t bypassed() const { + return _lowPriorityBypassCount.loadRelaxed(); + }; + + void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold); + +private: + enum class QueueType : unsigned int { + kLowPriority = 0, + kNormalPriority = 1, + // Exclusively used for statistics tracking. This queue should never have any processes + // 'queued'. + kImmediatePriority = 2, + QueueTypeSize = 3 + }; + + boost::optional _tryAcquireImpl(AdmissionContext* admCtx) override final; + + boost::optional _waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode) override final; + + void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final; + + void _resize(int newSize, int oldSize) noexcept override final; + + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final; + + void _appendImplStats(BSONObjBuilder& b) const override final; + + bool _tryAcquireTicket(); + + /** + * Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket. + * Implementors MUST wake at least one waiting thread if at least one thread is pending to be + * woken between all the queues. In other words, attemptToDequeue on each non-empty Queue must + * be called until either it returns true at least once or has been called on all queues. + * + * Care must be taken to ensure that only CPU-bound work is performed here and it doesn't block. + * + * When called the following invariants will be held: + * - The number of items in each queue will not change during the execution + * - No other thread will proceed to wait during the execution of the method + */ + void _dequeueWaitingThread(const ticket_queues::SharedLockGuard& sharedQueueLock); + + /** + * Returns whether there are higher priority threads pending to get a ticket in front of the + * given queue type and not enough tickets for all of them. + */ + bool _hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, QueueType queueType); + + unsigned int _enumToInt(QueueType queueType) { + return static_cast(queueType); + } + unsigned int _enumToInt(QueueType queueType) const { + return static_cast(queueType); + } + + ticket_queues::Queue& _getQueue(QueueType queueType) { + return _queues[_enumToInt(queueType)]; + } + + + QueueType _getQueueType(const AdmissionContext* admCtx) { + auto priority = admCtx->getPriority(); + switch (priority) { + case AdmissionContext::Priority::kLow: + return QueueType::kLowPriority; + case AdmissionContext::Priority::kNormal: + return QueueType::kNormalPriority; + case AdmissionContext::Priority::kImmediate: + return QueueType::kImmediatePriority; + default: + MONGO_UNREACHABLE; + } + } + + ticket_queues::QueueMutex _queueMutex; + std::array(QueueType::QueueTypeSize)> _queues; + std::array(QueueType::QueueTypeSize)> _stats; + + /** + * Limits the number times the low priority queue is non-empty and bypassed in favor of the + * normal priority queue for the next ticket admission. + * + * Updates must be done under the ticket_queues::UniqueLockGuard. + */ + int _lowPriorityBypassThreshold; + + /** + * Counts the number of times normal operations are dequeued over operations queued in the low + * priority queue. + */ + AtomicWord _lowPriorityBypassCount{0}; + + /** + * Number of times ticket admission is expedited for low priority operations. + */ + AtomicWord _expeditedLowPriorityAdmissions{0}; + AtomicWord _ticketsAvailable; + AtomicWord _enqueuedElements; + ServiceContext* _serviceContext; +}; +} // namespace mongo diff --git a/src/mongo/util/concurrency/semaphore_ticketholder.cpp b/src/mongo/util/concurrency/semaphore_ticketholder.cpp new file mode 100644 index 00000000000..56e6d84c29f --- /dev/null +++ b/src/mongo/util/concurrency/semaphore_ticketholder.cpp @@ -0,0 +1,236 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/service_context.h" +#include "mongo/util/concurrency/admission_context.h" +#include "mongo/util/concurrency/semaphore_ticketholder.h" +#include "mongo/util/concurrency/ticketholder.h" + +#include + +#include "mongo/logv2/log.h" +#include "mongo/util/str.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + +namespace mongo { + +void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const { + _appendCommonQueueImplStats(b, _semaphoreStats); +} +#if defined(__linux__) +namespace { + +/** + * Accepts an errno code, prints its error message, and exits. + */ +void failWithErrno(int err) { + LOGV2_FATAL(28604, + "error in Ticketholder: {errnoWithDescription_err}", + "errnoWithDescription_err"_attr = errorMessage(posixError(err))); +} + +/* + * Checks the return value from a Linux semaphore function call, and fails with the set errno if the + * call was unsucessful. + */ +void check(int ret) { + if (ret == 0) + return; + failWithErrno(errno); +} + +/** + * Takes a Date_t deadline and sets the appropriate values in a timespec structure. + */ +void tsFromDate(const Date_t& deadline, struct timespec& ts) { + ts.tv_sec = deadline.toTimeT(); + ts.tv_nsec = (deadline.toMillisSinceEpoch() % 1000) * 1'000'000; +} +} // namespace + +SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext) + : TicketHolderWithQueueingStats(numTickets, serviceContext) { + check(sem_init(&_sem, 0, numTickets)); +} + +SemaphoreTicketHolder::~SemaphoreTicketHolder() { + check(sem_destroy(&_sem)); +} + +boost::optional SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { + while (0 != sem_trywait(&_sem)) { + if (errno == EAGAIN) + return boost::none; + if (errno != EINTR) + failWithErrno(errno); + } + return Ticket{this, admCtx}; +} + +boost::optional SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode) { + const Milliseconds intervalMs(500); + struct timespec ts; + + // To support interrupting ticket acquisition while still benefiting from semaphores, we do a + // timed wait on an interval to periodically check for interrupts. + // The wait period interval is the smaller of the default interval and the provided + // deadline. + Date_t deadline = std::min(until, Date_t::now() + intervalMs); + tsFromDate(deadline, ts); + + while (0 != sem_timedwait(&_sem, &ts)) { + if (errno == ETIMEDOUT) { + // If we reached the deadline without being interrupted, we have completely timed out. + if (deadline == until) + return boost::none; + + deadline = std::min(until, Date_t::now() + intervalMs); + tsFromDate(deadline, ts); + } else if (errno != EINTR) { + failWithErrno(errno); + } + + // To correctly handle errors from sem_timedwait, we should check for interrupts last. + // It is possible to unset 'errno' after a call to checkForInterrupt(). + if (waitMode == WaitMode::kInterruptible) + opCtx->checkForInterrupt(); + } + return Ticket{this, admCtx}; +} + +void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { + check(sem_post(&_sem)); +} + +int SemaphoreTicketHolder::available() const { + int val = 0; + check(sem_getvalue(&_sem, &val)); + return val; +} + +void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { + auto difference = newSize - oldSize; + + if (difference > 0) { + for (int i = 0; i < difference; i++) { + check(sem_post(&_sem)); + } + } else if (difference < 0) { + for (int i = 0; i < -difference; i++) { + check(sem_wait(&_sem)); + } + } +} + +#else + +SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* svcCtx) + : TicketHolderWithQueueingStats(numTickets, svcCtx), _numTickets(numTickets) {} + +SemaphoreTicketHolder::~SemaphoreTicketHolder() = default; + +boost::optional SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { + stdx::lock_guard lk(_mutex); + if (!_tryAcquire()) { + return boost::none; + } + return Ticket{this, admCtx}; +} + +boost::optional SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode) { + stdx::unique_lock lk(_mutex); + + bool taken = [&] { + if (waitMode == WaitMode::kInterruptible) { + return opCtx->waitForConditionOrInterruptUntil( + _newTicket, lk, until, [this] { return _tryAcquire(); }); + } else { + if (until == Date_t::max()) { + _newTicket.wait(lk, [this] { return _tryAcquire(); }); + return true; + } else { + return _newTicket.wait_until( + lk, until.toSystemTimePoint(), [this] { return _tryAcquire(); }); + } + } + }(); + if (!taken) { + return boost::none; + } + return Ticket{this, admCtx}; +} + +void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { + { + stdx::lock_guard lk(_mutex); + _numTickets++; + } + _newTicket.notify_one(); +} + +int SemaphoreTicketHolder::available() const { + return _numTickets; +} + +bool SemaphoreTicketHolder::_tryAcquire() { + if (_numTickets <= 0) { + if (_numTickets < 0) { + std::cerr << "DISASTER! in TicketHolder" << std::endl; + } + return false; + } + _numTickets--; + return true; +} + +void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { + auto difference = newSize - oldSize; + + stdx::lock_guard lk(_mutex); + _numTickets += difference; + + if (difference > 0) { + for (int i = 0; i < difference; i++) { + _newTicket.notify_one(); + } + } + // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll + // have to wait until the current ticket holders release their tickets. +} +#endif +} // namespace mongo diff --git a/src/mongo/util/concurrency/semaphore_ticketholder.h b/src/mongo/util/concurrency/semaphore_ticketholder.h new file mode 100644 index 00000000000..56606c3da2e --- /dev/null +++ b/src/mongo/util/concurrency/semaphore_ticketholder.h @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#if defined(__linux__) +#include +#endif + +#include + +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/concurrency/admission_context.h" +#include "mongo/util/concurrency/mutex.h" +#include "mongo/util/concurrency/ticket_queues.h" +#include "mongo/util/concurrency/ticketholder.h" +#include "mongo/util/hierarchical_acquisition.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class SemaphoreTicketHolder final : public TicketHolderWithQueueingStats { +public: + explicit SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext); + ~SemaphoreTicketHolder() override final; + + int available() const override final; + + int queued() const override final { + auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed(); + auto added = _semaphoreStats.totalAddedQueue.loadRelaxed(); + return std::max(static_cast(added - removed), 0); + }; + + bool recordImmediateTicketStatistics() noexcept override final { + // Historically, operations that now acquire 'immediate' tickets bypassed the ticketing + // mechanism completely. Preserve legacy behavior where 'immediate' ticketing is not tracked + // in the statistics. + return false; + } + +private: + boost::optional _waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode) override final; + + boost::optional _tryAcquireImpl(AdmissionContext* admCtx) override final; + void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final; + + void _appendImplStats(BSONObjBuilder& b) const override final; + + void _resize(int newSize, int oldSize) noexcept override final; + + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final { + return _semaphoreStats; + } +#if defined(__linux__) + mutable sem_t _sem; + +#else + bool _tryAcquire(); + + int _numTickets; + Mutex _mutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex"); + stdx::condition_variable _newTicket; +#endif + QueueStats _semaphoreStats; +}; + +} // namespace mongo diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index eafed4da466..33e8c05b2c5 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -63,29 +63,6 @@ void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext, admCtx->start(serviceContext->getTickSource()); queueStats.totalStartedProcessing.fetchAndAddRelaxed(1); } - -/** - * Appends the standard statistics stored in QueueStats to BSONObjBuilder b; - */ -void appendCommonQueueImplStats(BSONObjBuilder& b, - const TicketHolderWithQueueingStats::QueueStats& stats) { - auto removed = stats.totalRemovedQueue.loadRelaxed(); - auto added = stats.totalAddedQueue.loadRelaxed(); - - b.append("addedToQueue", added); - b.append("removedFromQueue", removed); - b.append("queueLength", std::max(static_cast(added - removed), 0)); - - auto finished = stats.totalFinishedProcessing.loadRelaxed(); - auto started = stats.totalStartedProcessing.loadRelaxed(); - b.append("startedProcessing", started); - b.append("processing", std::max(static_cast(started - finished), 0)); - b.append("finishedProcessing", finished); - b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed()); - b.append("canceled", stats.totalCanceled.loadRelaxed()); - b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed()); - b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed()); -} } // namespace Ticket TicketHolderWithQueueingStats::acquireImmediateTicket(AdmissionContext* admCtx) { @@ -187,393 +164,24 @@ boost::optional TicketHolderWithQueueingStats::waitForTicketUntil(Operat } } -void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const { - appendCommonQueueImplStats(b, _semaphoreStats); -} -#if defined(__linux__) -namespace { - -/** - * Accepts an errno code, prints its error message, and exits. - */ -void failWithErrno(int err) { - LOGV2_FATAL(28604, - "error in Ticketholder: {errnoWithDescription_err}", - "errnoWithDescription_err"_attr = errorMessage(posixError(err))); -} - -/* - * Checks the return value from a Linux semaphore function call, and fails with the set errno if the - * call was unsucessful. - */ -void check(int ret) { - if (ret == 0) - return; - failWithErrno(errno); -} - -/** - * Takes a Date_t deadline and sets the appropriate values in a timespec structure. - */ -void tsFromDate(const Date_t& deadline, struct timespec& ts) { - ts.tv_sec = deadline.toTimeT(); - ts.tv_nsec = (deadline.toMillisSinceEpoch() % 1000) * 1'000'000; -} -} // namespace - -SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext) - : TicketHolderWithQueueingStats(numTickets, serviceContext) { - check(sem_init(&_sem, 0, numTickets)); -} - -SemaphoreTicketHolder::~SemaphoreTicketHolder() { - check(sem_destroy(&_sem)); -} - -boost::optional SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { - while (0 != sem_trywait(&_sem)) { - if (errno == EAGAIN) - return boost::none; - if (errno != EINTR) - failWithErrno(errno); - } - return Ticket{this, admCtx}; -} - -boost::optional SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) { - const Milliseconds intervalMs(500); - struct timespec ts; - - // To support interrupting ticket acquisition while still benefiting from semaphores, we do a - // timed wait on an interval to periodically check for interrupts. - // The wait period interval is the smaller of the default interval and the provided - // deadline. - Date_t deadline = std::min(until, Date_t::now() + intervalMs); - tsFromDate(deadline, ts); - - while (0 != sem_timedwait(&_sem, &ts)) { - if (errno == ETIMEDOUT) { - // If we reached the deadline without being interrupted, we have completely timed out. - if (deadline == until) - return boost::none; - - deadline = std::min(until, Date_t::now() + intervalMs); - tsFromDate(deadline, ts); - } else if (errno != EINTR) { - failWithErrno(errno); - } - - // To correctly handle errors from sem_timedwait, we should check for interrupts last. - // It is possible to unset 'errno' after a call to checkForInterrupt(). - if (waitMode == WaitMode::kInterruptible) - opCtx->checkForInterrupt(); - } - return Ticket{this, admCtx}; -} - -void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { - check(sem_post(&_sem)); -} - -int SemaphoreTicketHolder::available() const { - int val = 0; - check(sem_getvalue(&_sem, &val)); - return val; -} - -void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { - auto difference = newSize - oldSize; - - if (difference > 0) { - for (int i = 0; i < difference; i++) { - check(sem_post(&_sem)); - } - } else if (difference < 0) { - for (int i = 0; i < -difference; i++) { - check(sem_wait(&_sem)); - } - } -} - -#else - -SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* svcCtx) - : TicketHolderWithQueueingStats(numTickets, svcCtx), _numTickets(numTickets) {} - -SemaphoreTicketHolder::~SemaphoreTicketHolder() = default; - -boost::optional SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { - stdx::lock_guard lk(_mutex); - if (!_tryAcquire()) { - return boost::none; - } - return Ticket{this, admCtx}; -} - -boost::optional SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) { - stdx::unique_lock lk(_mutex); - - bool taken = [&] { - if (waitMode == WaitMode::kInterruptible) { - return opCtx->waitForConditionOrInterruptUntil( - _newTicket, lk, until, [this] { return _tryAcquire(); }); - } else { - if (until == Date_t::max()) { - _newTicket.wait(lk, [this] { return _tryAcquire(); }); - return true; - } else { - return _newTicket.wait_until( - lk, until.toSystemTimePoint(), [this] { return _tryAcquire(); }); - } - } - }(); - if (!taken) { - return boost::none; - } - return Ticket{this, admCtx}; -} - -void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { - { - stdx::lock_guard lk(_mutex); - _numTickets++; - } - _newTicket.notify_one(); -} - -int SemaphoreTicketHolder::available() const { - return _numTickets; -} - -bool SemaphoreTicketHolder::_tryAcquire() { - if (_numTickets <= 0) { - if (_numTickets < 0) { - std::cerr << "DISASTER! in TicketHolder" << std::endl; - } - return false; - } - _numTickets--; - return true; -} - -void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { - auto difference = newSize - oldSize; - - stdx::lock_guard lk(_mutex); - _numTickets += difference; - - if (difference > 0) { - for (int i = 0; i < difference; i++) { - _newTicket.notify_one(); - } - } - // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll - // have to wait until the current ticket holders release their tickets. -} -#endif - -PriorityTicketHolder::PriorityTicketHolder(int numTickets, - int lowPriorityBypassThreshold, - ServiceContext* serviceContext) - : TicketHolderWithQueueingStats(numTickets, serviceContext), - _lowPriorityBypassThreshold(lowPriorityBypassThreshold), - _serviceContext(serviceContext) { - _ticketsAvailable.store(numTickets); - _enqueuedElements.store(0); -} - -void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold( - const int& newBypassThreshold) { - ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex); - _lowPriorityBypassThreshold = newBypassThreshold; -} - -boost::optional PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { - invariant(admCtx); - // Low priority operations cannot use optimistic ticket acquisition and will go to the queue - // instead. This is done to prevent them from skipping the line before other high-priority - // operations. - if (admCtx->getPriority() >= AdmissionContext::Priority::kNormal) { - auto hasAcquired = _tryAcquireTicket(); - if (hasAcquired) { - return Ticket{this, admCtx}; - } - } - return boost::none; -} - -boost::optional PriorityTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) { - invariant(admCtx); - - auto queueType = _getQueueType(admCtx); - auto& queue = _getQueue(queueType); - - bool interruptible = waitMode == WaitMode::kInterruptible; - - _enqueuedElements.addAndFetch(1); - ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); }); - - ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex); - do { - while (_ticketsAvailable.load() <= 0 || - _hasToWaitForHigherPriority(uniqueQueueLock, queueType)) { - bool hasTimedOut = !queue.enqueue(uniqueQueueLock, opCtx, until, interruptible); - if (hasTimedOut) { - return boost::none; - } - } - } while (!_tryAcquireTicket()); - return Ticket{this, admCtx}; -} - -void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { - // Tickets acquired with priority kImmediate are not generated from the pool of available - // tickets, and thus should never be returned to the pool of available tickets. - invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); - - // The idea behind the release mechanism consists of a consistent view of queued elements - // waiting for a ticket and many threads releasing tickets simultaneously. The releasers will - // proceed to attempt to dequeue an element by seeing if there are threads not woken and waking - // one, having increased the number of woken threads for accuracy. Once the thread gets woken it - // will then decrease the number of woken threads (as it has been woken) and then attempt to - // acquire a ticket. The two possible states are either one or more releasers releasing or a - // thread waking up due to the RW mutex. - // - // Under this lock the queues cannot be modified in terms of someone attempting to enqueue on - // them, only waking threads is allowed. - ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex); - _ticketsAvailable.addAndFetch(1); - _dequeueWaitingThread(sharedQueueLock); -} - -void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept { - auto difference = newSize - oldSize; - - _ticketsAvailable.fetchAndAdd(difference); - - if (difference > 0) { - // As we're adding tickets the waiting threads need to be notified that there are new - // tickets available. - ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex); - for (int i = 0; i < difference; i++) { - _dequeueWaitingThread(sharedQueueLock); - } - } - - // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll - // have to wait until the current ticket holders release their tickets. -} - -TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse( - const AdmissionContext* admCtx) noexcept { - auto queueType = _getQueueType(admCtx); - return _stats[_enumToInt(queueType)]; -} - -void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { - { - BSONObjBuilder bbb(b.subobjStart("lowPriority")); - const auto& lowPriorityTicketStats = _stats[_enumToInt(QueueType::kLowPriority)]; - appendCommonQueueImplStats(bbb, lowPriorityTicketStats); - bbb.append("expedited", expedited()); - bbb.append("bypassed", bypassed()); - bbb.done(); - } - { - BSONObjBuilder bbb(b.subobjStart("normalPriority")); - const auto& normalPriorityTicketStats = _stats[_enumToInt(QueueType::kNormalPriority)]; - appendCommonQueueImplStats(bbb, normalPriorityTicketStats); - bbb.done(); - } - { - BSONObjBuilder bbb(b.subobjStart("immediatePriority")); - // Since 'kImmediate' priority operations will never queue, omit queueing statistics that - // will always be 0. - const auto& immediateTicketStats = _stats[_enumToInt(QueueType::kImmediatePriority)]; - - auto finished = immediateTicketStats.totalFinishedProcessing.loadRelaxed(); - auto started = immediateTicketStats.totalStartedProcessing.loadRelaxed(); - bbb.append("startedProcessing", started); - bbb.append("processing", std::max(static_cast(started - finished), 0)); - bbb.append("finishedProcessing", finished); - bbb.append("totalTimeProcessingMicros", - immediateTicketStats.totalTimeProcessingMicros.loadRelaxed()); - bbb.append("newAdmissions", immediateTicketStats.totalNewAdmissions.loadRelaxed()); - bbb.done(); - } -} - -bool PriorityTicketHolder::_tryAcquireTicket() { - auto remaining = _ticketsAvailable.subtractAndFetch(1); - if (remaining < 0) { - _ticketsAvailable.addAndFetch(1); - return false; - } - return true; -} - -void PriorityTicketHolder::_dequeueWaitingThread( - const ticket_queues::SharedLockGuard& sharedQueueLock) { - // There are only 2 possible queues to dequeue from - the low priority and normal priority - // queues. There will never be anything to dequeue from the immediate priority queue, given - // immediate priority operations will never wait for ticket admission. - auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority); - auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority); - - // There is a guarantee that the number of queued elements cannot change while holding the - // shared queue lock. - auto lowQueueCount = lowPriorityQueue.queuedElems(); - auto normalQueueCount = normalPriorityQueue.queuedElems(); - - if (lowQueueCount == 0 && normalQueueCount == 0) { - return; - } - if (lowQueueCount == 0) { - normalPriorityQueue.attemptToDequeue(sharedQueueLock); - return; - } - if (normalQueueCount == 0) { - lowPriorityQueue.attemptToDequeue(sharedQueueLock); - return; - } - - // Both queues are non-empty, and the low priority queue is bypassed for dequeue in favor of the - // normal priority queue until the bypass threshold is met. - if (_lowPriorityBypassThreshold > 0 && - _lowPriorityBypassCount.addAndFetch(1) % _lowPriorityBypassThreshold == 0) { - if (lowPriorityQueue.attemptToDequeue(sharedQueueLock)) { - _expeditedLowPriorityAdmissions.addAndFetch(1); - } else { - normalPriorityQueue.attemptToDequeue(sharedQueueLock); - } - return; - } +void TicketHolderWithQueueingStats::_appendCommonQueueImplStats(BSONObjBuilder& b, + const QueueStats& stats) const { + auto removed = stats.totalRemovedQueue.loadRelaxed(); + auto added = stats.totalAddedQueue.loadRelaxed(); - if (!normalPriorityQueue.attemptToDequeue(sharedQueueLock)) { - lowPriorityQueue.attemptToDequeue(sharedQueueLock); - } -} + b.append("addedToQueue", added); + b.append("removedFromQueue", removed); + b.append("queueLength", std::max(static_cast(added - removed), 0)); -bool PriorityTicketHolder::_hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, - QueueType queue) { - switch (queue) { - case QueueType::kLowPriority: { - const auto& normalQueue = _getQueue(QueueType::kNormalPriority); - auto pending = normalQueue.getThreadsPendingToWake(); - return pending != 0 && pending >= _ticketsAvailable.load(); - } - default: - return false; - } + auto finished = stats.totalFinishedProcessing.loadRelaxed(); + auto started = stats.totalStartedProcessing.loadRelaxed(); + b.append("startedProcessing", started); + b.append("processing", std::max(static_cast(started - finished), 0)); + b.append("finishedProcessing", finished); + b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed()); + b.append("canceled", stats.totalCanceled.loadRelaxed()); + b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed()); + b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed()); } } // namespace mongo diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index a87dacca15f..5d16ef1ae25 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -28,12 +28,6 @@ */ #pragma once -#if defined(__linux__) -#include -#endif - -#include - #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/platform/mutex.h" @@ -41,13 +35,14 @@ #include "mongo/stdx/future.h" #include "mongo/util/concurrency/admission_context.h" #include "mongo/util/concurrency/mutex.h" -#include "mongo/util/concurrency/ticket_queues.h" #include "mongo/util/hierarchical_acquisition.h" #include "mongo/util/time_support.h" namespace mongo { class Ticket; +class PriorityTicketHolder; +class SemaphoreTicketHolder; /** * Maintains and distributes tickets across operations from a limited pool of tickets. The ticketing @@ -224,198 +219,11 @@ private: AtomicWord _outof; protected: - ServiceContext* _serviceContext; -}; - -class SemaphoreTicketHolder final : public TicketHolderWithQueueingStats { -public: - explicit SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext); - ~SemaphoreTicketHolder() override final; - - int available() const override final; - - int queued() const override final { - auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed(); - auto added = _semaphoreStats.totalAddedQueue.loadRelaxed(); - return std::max(static_cast(added - removed), 0); - }; - - bool recordImmediateTicketStatistics() noexcept override final { - // Historically, operations that now acquire 'immediate' tickets bypassed the ticketing - // mechanism completely. Preserve legacy behavior where 'immediate' ticketing is not tracked - // in the statistics. - return false; - } - -private: - boost::optional _waitForTicketUntilImpl(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) override final; - - boost::optional _tryAcquireImpl(AdmissionContext* admCtx) override final; - void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final; - - void _appendImplStats(BSONObjBuilder& b) const override final; - - void _resize(int newSize, int oldSize) noexcept override final; - - QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final { - return _semaphoreStats; - } -#if defined(__linux__) - mutable sem_t _sem; - -#else - bool _tryAcquire(); - - int _numTickets; - Mutex _mutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex"); - stdx::condition_variable _newTicket; -#endif - QueueStats _semaphoreStats; -}; - -/** - * A ticketholder implementation that centralises all ticket acquisition/releases. Waiters will get - * placed in a specific internal queue according to some logic. Releasers will wake up a waiter - * from a group chosen according to some logic. - */ -class PriorityTicketHolder : public TicketHolderWithQueueingStats { -public: - explicit PriorityTicketHolder(int numTickets, - int lowPriorityBypassThreshold, - ServiceContext* serviceContext); - ~PriorityTicketHolder() override{}; - - int available() const override final { - return _ticketsAvailable.load(); - }; - - int queued() const override final { - return _enqueuedElements.loadRelaxed(); - } - - bool recordImmediateTicketStatistics() noexcept override final { - return true; - }; - /** - * Number of times low priority operations are expedited for ticket admission over normal - * priority operations. + * Appends the standard statistics stored in QueueStats to BSONObjBuilder b; */ - std::int64_t expedited() const { - return _expeditedLowPriorityAdmissions.loadRelaxed(); - } + void _appendCommonQueueImplStats(BSONObjBuilder& b, const QueueStats& stats) const; - /** - * Returns the number of times the low priority queue is bypassed in favor of dequeuing from the - * normal priority queue when a ticket becomes available. - */ - std::int64_t bypassed() const { - return _lowPriorityBypassCount.loadRelaxed(); - }; - - void updateLowPriorityAdmissionBypassThreshold(const int& newBypassThreshold); - -private: - enum class QueueType : unsigned int { - kLowPriority = 0, - kNormalPriority = 1, - // Exclusively used for statistics tracking. This queue should never have any processes - // 'queued'. - kImmediatePriority = 2, - QueueTypeSize = 3 - }; - - boost::optional _tryAcquireImpl(AdmissionContext* admCtx) override final; - - boost::optional _waitForTicketUntilImpl(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) override final; - - void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final; - - void _resize(int newSize, int oldSize) noexcept override final; - - QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final; - - void _appendImplStats(BSONObjBuilder& b) const override final; - - bool _tryAcquireTicket(); - - /** - * Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket. - * Implementors MUST wake at least one waiting thread if at least one thread is pending to be - * woken between all the queues. In other words, attemptToDequeue on each non-empty Queue must - * be called until either it returns true at least once or has been called on all queues. - * - * Care must be taken to ensure that only CPU-bound work is performed here and it doesn't block. - * - * When called the following invariants will be held: - * - The number of items in each queue will not change during the execution - * - No other thread will proceed to wait during the execution of the method - */ - void _dequeueWaitingThread(const ticket_queues::SharedLockGuard& sharedQueueLock); - - /** - * Returns whether there are higher priority threads pending to get a ticket in front of the - * given queue type and not enough tickets for all of them. - */ - bool _hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, QueueType queueType); - - unsigned int _enumToInt(QueueType queueType) { - return static_cast(queueType); - } - unsigned int _enumToInt(QueueType queueType) const { - return static_cast(queueType); - } - - ticket_queues::Queue& _getQueue(QueueType queueType) { - return _queues[_enumToInt(queueType)]; - } - - - QueueType _getQueueType(const AdmissionContext* admCtx) { - auto priority = admCtx->getPriority(); - switch (priority) { - case AdmissionContext::Priority::kLow: - return QueueType::kLowPriority; - case AdmissionContext::Priority::kNormal: - return QueueType::kNormalPriority; - case AdmissionContext::Priority::kImmediate: - return QueueType::kImmediatePriority; - default: - MONGO_UNREACHABLE; - } - } - - ticket_queues::QueueMutex _queueMutex; - std::array(QueueType::QueueTypeSize)> _queues; - std::array(QueueType::QueueTypeSize)> _stats; - - /** - * Limits the number times the low priority queue is non-empty and bypassed in favor of the - * normal priority queue for the next ticket admission. - * - * Updates must be done under the ticket_queues::UniqueLockGuard. - */ - int _lowPriorityBypassThreshold; - - /** - * Counts the number of times normal operations are dequeued over operations queued in the low - * priority queue. - */ - AtomicWord _lowPriorityBypassCount{0}; - - /** - * Number of times ticket admission is expedited for low priority operations. - */ - AtomicWord _expeditedLowPriorityAdmissions{0}; - AtomicWord _ticketsAvailable; - AtomicWord _enqueuedElements; ServiceContext* _serviceContext; }; diff --git a/src/mongo/util/concurrency/ticketholder_bm.cpp b/src/mongo/util/concurrency/ticketholder_bm.cpp index a3ee1a14768..d6983df1dca 100644 --- a/src/mongo/util/concurrency/ticketholder_bm.cpp +++ b/src/mongo/util/concurrency/ticketholder_bm.cpp @@ -35,6 +35,8 @@ #include "mongo/db/concurrency/locker_noop_client_observer.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" +#include "mongo/util/concurrency/priority_ticketholder.h" +#include "mongo/util/concurrency/semaphore_ticketholder.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/tick_source_mock.h" diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp index 3c7f8b1803c..ad08fc92b18 100644 --- a/src/mongo/util/concurrency/ticketholder_test.cpp +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -40,12 +40,13 @@ #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/concurrency/admission_context.h" +#include "mongo/util/concurrency/priority_ticketholder.h" +#include "mongo/util/concurrency/semaphore_ticketholder.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/tick_source_mock.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - namespace { using namespace mongo; -- cgit v1.2.1