diff options
Diffstat (limited to 'src/mongo/util/concurrency')
-rw-r--r-- | src/mongo/util/concurrency/SConscript | 11 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 90 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 11 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_bm.cpp | 145 |
4 files changed, 210 insertions, 47 deletions
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript index 607f155aae8..4a737483d8a 100644 --- a/src/mongo/util/concurrency/SConscript +++ b/src/mongo/util/concurrency/SConscript @@ -58,3 +58,14 @@ env.CppUnitTest( 'ticketholder', ] ) + +env.Benchmark( + target='ticketholder_bm', + source=[ + 'ticketholder_bm.cpp', + ], + LIBDEPS=[ + 'ticketholder', + ], +) + diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index e46ff7c0c0f..4043eea04b7 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -262,61 +262,63 @@ bool SemaphoreTicketHolder::_tryAcquire() { } #endif -FifoTicketHolder::FifoTicketHolder(int num) - : _capacity(num), _numAvailable(num), _elementsInQueue(0) {} +FifoTicketHolder::FifoTicketHolder(int num) : _capacity(num) { + _ticketsAvailable.store(num); + _enqueuedElements.store(0); +} FifoTicketHolder::~FifoTicketHolder() {} int FifoTicketHolder::available() const { - return _numAvailable.load(); + return _ticketsAvailable.load(); } int FifoTicketHolder::used() const { - return _capacity.load() - _numAvailable.load(); + return outof() - available(); } int FifoTicketHolder::outof() const { return _capacity.load(); } -void FifoTicketHolder::_release(WithLock) { - auto newAvailable = _numAvailable.addAndFetch(1); - // This is not an optimization but defensively programming against possible edge cases that we - // haven't thought of. For example, if we have the situation where we have N tickets available - // but something is in the queue, we must push it to the end of the queue. In this case having a - // batch release process would be beneficial as it would remove as many elements as it can from - // the queue, leading us back to the fast ticket path in the normal case. - while (newAvailable > 0) { - auto waitingElement = _queue.tryPop(); - if (waitingElement) { - auto& elem = *waitingElement; - _elementsInQueue.subtractAndFetch(1); - stdx::lock_guard lk(elem->modificationMutex); - if (elem->state != WaitingState::Waiting) { - // If the operation has already been finalized we skip the element and don't assign - // a ticket. - continue; +void FifoTicketHolder::release() { + stdx::lock_guard lk(_queueMutex); + // This loop will most of the time be executed only once. In case some operations in the + // queue have been cancelled or already took a ticket the releasing operation should search for + // a waiting operation to avoid leaving an operation waiting indefinitely. + while (true) { + if (!_queue.empty()) { + auto& elem = _queue.front(); + _enqueuedElements.subtractAndFetch(1); + { + stdx::lock_guard elemLk(elem->modificationMutex); + if (elem->state != WaitingState::Waiting) { + // If the operation has already been finalized we skip the element and don't + // assign a ticket. + _queue.pop(); + continue; + } + elem->state = WaitingState::Assigned; } - elem->state = WaitingState::Assigned; elem->signaler.notify_all(); - newAvailable = _numAvailable.subtractAndFetch(1); + _queue.pop(); } else { - return; + _ticketsAvailable.addAndFetch(1); } + return; } -} // namespace mongo - -void FifoTicketHolder::release() { - stdx::lock_guard lk(_queueMutex); - _release(lk); } bool FifoTicketHolder::tryAcquire() { - stdx::lock_guard lk(_queueMutex); - if (_numAvailable.load() > 0 && _elementsInQueue.load() == 0) { - _numAvailable.subtractAndFetch(1); - return true; - } else { + auto queued = _enqueuedElements.load(); + if (queued > 0) + return false; + + auto remaining = _ticketsAvailable.subtractAndFetch(1); + if (remaining < 0) { + _ticketsAvailable.addAndFetch(1); return false; } + + return true; } void FifoTicketHolder::waitForTicket(OperationContext* opCtx) { @@ -333,14 +335,19 @@ bool FifoTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) waitingElement->state = WaitingState::Waiting; { stdx::lock_guard lk(_queueMutex); - if (opCtx) { - _queue.push(std::shared_ptr(waitingElement), opCtx); - } else { - _queue.push(std::shared_ptr(waitingElement)); + _enqueuedElements.addAndFetch(1); + // Check for available tickets under the queue lock, in case a ticket has just been + // released. + auto remaining = _ticketsAvailable.subtractAndFetch(1); + if (remaining >= 0) { + _enqueuedElements.subtractAndFetch(1); + return true; } - _elementsInQueue.addAndFetch(1); + _ticketsAvailable.addAndFetch(1); + // We copy-construct the shared_ptr here as the waiting element needs to be alive in both + // release() and waitForTicket(). Otherwise the code could lead to a segmentation fault + _queue.emplace(waitingElement); } - ScopeGuard cancelWait([&] { bool hasAssignedTicket = false; { @@ -351,8 +358,7 @@ bool FifoTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) if (hasAssignedTicket) { // To cover the edge case of getting a ticket assigned before cancelling the ticket // request. As we have been granted a ticket we must release it. - stdx::lock_guard queueLock(_queueMutex); - _release(queueLock); + release(); } }); diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index d0dcfcb746b..b4cf246bdac 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -32,13 +32,14 @@ #include <semaphore.h> #endif +#include <queue> + #include "mongo/db/operation_context.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/future.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/hierarchical_acquisition.h" -#include "mongo/util/producer_consumer_queue.h" #include "mongo/util/time_support.h" namespace mongo { @@ -153,20 +154,20 @@ private: Mutex _resizeMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "FifoTicketHolder::_resizeMutex"); AtomicWord<int> _capacity; - AtomicWord<int> _numAvailable; - AtomicWord<int> _elementsInQueue; enum class WaitingState { Waiting, Cancelled, Assigned }; struct WaitingElement { stdx::condition_variable signaler; Mutex modificationMutex = MONGO_MAKE_LATCH( - HierarchicalAcquisitionLevel(1), "FifoTicketHolder::WaitingElement::modificationMutex"); + HierarchicalAcquisitionLevel(0), "FifoTicketHolder::WaitingElement::modificationMutex"); WaitingState state; }; - MultiProducerMultiConsumerQueue<std::shared_ptr<WaitingElement>> _queue; + std::queue<std::shared_ptr<WaitingElement>> _queue; // _queueMutex protects all modifications made to either the _queue, or the statistics of the // queue. Mutex _queueMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "FifoTicketHolder::_queueMutex"); + AtomicWord<int> _enqueuedElements; + AtomicWord<int> _ticketsAvailable; }; class ScopedTicket { diff --git a/src/mongo/util/concurrency/ticketholder_bm.cpp b/src/mongo/util/concurrency/ticketholder_bm.cpp new file mode 100644 index 00000000000..3ea382ba62e --- /dev/null +++ b/src/mongo/util/concurrency/ticketholder_bm.cpp @@ -0,0 +1,145 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <benchmark/benchmark.h> + +#include <vector> + +#include "mongo/util/concurrency/ticketholder.h" + + +namespace mongo { +namespace { + +static int kTickets = 128; +static int kThreadMin = 8; +static int kThreadMax = 1024; + +template <class TicketHolderImpl> +void BM_tryAcquire(benchmark::State& state) { + static std::unique_ptr<TicketHolder> ticketHolder; + if (state.thread_index == 0) { + ticketHolder = std::make_unique<TicketHolderImpl>(kTickets); + } + double attempted = 0, acquired = 0; + for (auto _ : state) { + auto hasAcquired = ticketHolder->tryAcquire(); + state.PauseTiming(); + sleepmicros(1); + attempted++; + if (hasAcquired) { + acquired++; + ticketHolder->release(); + } + state.ResumeTiming(); + } + state.counters["Attempted"] = attempted; + state.counters["Acquired"] = acquired; +} + +BENCHMARK_TEMPLATE(BM_tryAcquire, SemaphoreTicketHolder)->ThreadRange(kThreadMin, kThreadMax); + +BENCHMARK_TEMPLATE(BM_tryAcquire, FifoTicketHolder)->ThreadRange(kThreadMin, kThreadMax); + +template <class TicketHolderImpl> +void BM_acquire(benchmark::State& state) { + static std::unique_ptr<TicketHolder> ticketHolder; + if (state.thread_index == 0) { + ticketHolder = std::make_unique<TicketHolderImpl>(kTickets); + } + double acquired = 0; + for (auto _ : state) { + ticketHolder->waitForTicket(); + state.PauseTiming(); + sleepmicros(1); + ticketHolder->release(); + acquired++; + state.ResumeTiming(); + } + state.counters["Acquired"] = benchmark::Counter(acquired, benchmark::Counter::kIsRate); + state.counters["AcquiredPerThread"] = + benchmark::Counter(acquired, benchmark::Counter::kAvgThreadsRate); +} + +BENCHMARK_TEMPLATE(BM_acquire, SemaphoreTicketHolder)->ThreadRange(kThreadMin, kThreadMax); + +BENCHMARK_TEMPLATE(BM_acquire, FifoTicketHolder)->ThreadRange(kThreadMin, kThreadMax); + +template <class TicketHolderImpl> +void BM_release(benchmark::State& state) { + static std::unique_ptr<TicketHolder> ticketHolder; + if (state.thread_index == 0) { + ticketHolder = std::make_unique<TicketHolderImpl>(kTickets); + } + double acquired = 0; + for (auto _ : state) { + state.PauseTiming(); + ticketHolder->waitForTicket(); + sleepmicros(1); + state.ResumeTiming(); + ticketHolder->release(); + acquired++; + } + state.counters["Acquired"] = benchmark::Counter(acquired, benchmark::Counter::kIsRate); + state.counters["AcquiredPerThread"] = + benchmark::Counter(acquired, benchmark::Counter::kAvgThreadsRate); +} + +BENCHMARK_TEMPLATE(BM_release, SemaphoreTicketHolder)->ThreadRange(kThreadMin, kThreadMax); + +BENCHMARK_TEMPLATE(BM_release, FifoTicketHolder)->ThreadRange(kThreadMin, kThreadMax); + + +template <class H> +void BM_acquireAndRelease(benchmark::State& state) { + static std::unique_ptr<TicketHolder> ticketHolder; + if (state.thread_index == 0) { + ticketHolder = std::make_unique<H>(kTickets); + } + double acquired = 0; + for (auto _ : state) { + ticketHolder->waitForTicket(); + state.PauseTiming(); + sleepmicros(1); + state.ResumeTiming(); + ticketHolder->release(); + acquired++; + } + state.counters["Acquired"] = benchmark::Counter(acquired, benchmark::Counter::kIsRate); + state.counters["AcquiredPerThread"] = + benchmark::Counter(acquired, benchmark::Counter::kAvgThreadsRate); +} + +BENCHMARK_TEMPLATE(BM_acquireAndRelease, SemaphoreTicketHolder) + ->ThreadRange(kThreadMin, kThreadMax); + +BENCHMARK_TEMPLATE(BM_acquireAndRelease, FifoTicketHolder)->ThreadRange(kThreadMin, kThreadMax); + +} // namespace +} // namespace mongo |