summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2022-03-16 10:23:54 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-16 10:59:00 +0000
commit6184015b89d61a890d11f682d172c8473b60b513 (patch)
treee3728eb7e3441907ada05e62decaeea1e0ba9dc1
parentdfe7541231e794887be35eff95482285f6e21f35 (diff)
downloadmongo-6184015b89d61a890d11f682d172c8473b60b513.tar.gz
SERVER-64076 Fix deadlock when waiting for tickets
-rw-r--r--src/mongo/util/concurrency/SConscript11
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp90
-rw-r--r--src/mongo/util/concurrency/ticketholder.h11
-rw-r--r--src/mongo/util/concurrency/ticketholder_bm.cpp145
4 files changed, 210 insertions, 47 deletions
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index 607f155aae8..4a737483d8a 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -58,3 +58,14 @@ env.CppUnitTest(
'ticketholder',
]
)
+
+env.Benchmark(
+ target='ticketholder_bm',
+ source=[
+ 'ticketholder_bm.cpp',
+ ],
+ LIBDEPS=[
+ 'ticketholder',
+ ],
+)
+
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index e46ff7c0c0f..4043eea04b7 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -262,61 +262,63 @@ bool SemaphoreTicketHolder::_tryAcquire() {
}
#endif
-FifoTicketHolder::FifoTicketHolder(int num)
- : _capacity(num), _numAvailable(num), _elementsInQueue(0) {}
+FifoTicketHolder::FifoTicketHolder(int num) : _capacity(num) {
+ _ticketsAvailable.store(num);
+ _enqueuedElements.store(0);
+}
FifoTicketHolder::~FifoTicketHolder() {}
int FifoTicketHolder::available() const {
- return _numAvailable.load();
+ return _ticketsAvailable.load();
}
int FifoTicketHolder::used() const {
- return _capacity.load() - _numAvailable.load();
+ return outof() - available();
}
int FifoTicketHolder::outof() const {
return _capacity.load();
}
-void FifoTicketHolder::_release(WithLock) {
- auto newAvailable = _numAvailable.addAndFetch(1);
- // This is not an optimization but defensively programming against possible edge cases that we
- // haven't thought of. For example, if we have the situation where we have N tickets available
- // but something is in the queue, we must push it to the end of the queue. In this case having a
- // batch release process would be beneficial as it would remove as many elements as it can from
- // the queue, leading us back to the fast ticket path in the normal case.
- while (newAvailable > 0) {
- auto waitingElement = _queue.tryPop();
- if (waitingElement) {
- auto& elem = *waitingElement;
- _elementsInQueue.subtractAndFetch(1);
- stdx::lock_guard lk(elem->modificationMutex);
- if (elem->state != WaitingState::Waiting) {
- // If the operation has already been finalized we skip the element and don't assign
- // a ticket.
- continue;
+void FifoTicketHolder::release() {
+ stdx::lock_guard lk(_queueMutex);
+ // This loop will most of the time be executed only once. In case some operations in the
+ // queue have been cancelled or already took a ticket the releasing operation should search for
+ // a waiting operation to avoid leaving an operation waiting indefinitely.
+ while (true) {
+ if (!_queue.empty()) {
+ auto& elem = _queue.front();
+ _enqueuedElements.subtractAndFetch(1);
+ {
+ stdx::lock_guard elemLk(elem->modificationMutex);
+ if (elem->state != WaitingState::Waiting) {
+ // If the operation has already been finalized we skip the element and don't
+ // assign a ticket.
+ _queue.pop();
+ continue;
+ }
+ elem->state = WaitingState::Assigned;
}
- elem->state = WaitingState::Assigned;
elem->signaler.notify_all();
- newAvailable = _numAvailable.subtractAndFetch(1);
+ _queue.pop();
} else {
- return;
+ _ticketsAvailable.addAndFetch(1);
}
+ return;
}
-} // namespace mongo
-
-void FifoTicketHolder::release() {
- stdx::lock_guard lk(_queueMutex);
- _release(lk);
}
bool FifoTicketHolder::tryAcquire() {
- stdx::lock_guard lk(_queueMutex);
- if (_numAvailable.load() > 0 && _elementsInQueue.load() == 0) {
- _numAvailable.subtractAndFetch(1);
- return true;
- } else {
+ auto queued = _enqueuedElements.load();
+ if (queued > 0)
+ return false;
+
+ auto remaining = _ticketsAvailable.subtractAndFetch(1);
+ if (remaining < 0) {
+ _ticketsAvailable.addAndFetch(1);
return false;
}
+
+ return true;
}
void FifoTicketHolder::waitForTicket(OperationContext* opCtx) {
@@ -333,14 +335,19 @@ bool FifoTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until)
waitingElement->state = WaitingState::Waiting;
{
stdx::lock_guard lk(_queueMutex);
- if (opCtx) {
- _queue.push(std::shared_ptr(waitingElement), opCtx);
- } else {
- _queue.push(std::shared_ptr(waitingElement));
+ _enqueuedElements.addAndFetch(1);
+ // Check for available tickets under the queue lock, in case a ticket has just been
+ // released.
+ auto remaining = _ticketsAvailable.subtractAndFetch(1);
+ if (remaining >= 0) {
+ _enqueuedElements.subtractAndFetch(1);
+ return true;
}
- _elementsInQueue.addAndFetch(1);
+ _ticketsAvailable.addAndFetch(1);
+ // We copy-construct the shared_ptr here as the waiting element needs to be alive in both
+ // release() and waitForTicket(). Otherwise the code could lead to a segmentation fault
+ _queue.emplace(waitingElement);
}
-
ScopeGuard cancelWait([&] {
bool hasAssignedTicket = false;
{
@@ -351,8 +358,7 @@ bool FifoTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until)
if (hasAssignedTicket) {
// To cover the edge case of getting a ticket assigned before cancelling the ticket
// request. As we have been granted a ticket we must release it.
- stdx::lock_guard queueLock(_queueMutex);
- _release(queueLock);
+ release();
}
});
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index d0dcfcb746b..b4cf246bdac 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -32,13 +32,14 @@
#include <semaphore.h>
#endif
+#include <queue>
+
#include "mongo/db/operation_context.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/future.h"
#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/hierarchical_acquisition.h"
-#include "mongo/util/producer_consumer_queue.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -153,20 +154,20 @@ private:
Mutex _resizeMutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "FifoTicketHolder::_resizeMutex");
AtomicWord<int> _capacity;
- AtomicWord<int> _numAvailable;
- AtomicWord<int> _elementsInQueue;
enum class WaitingState { Waiting, Cancelled, Assigned };
struct WaitingElement {
stdx::condition_variable signaler;
Mutex modificationMutex = MONGO_MAKE_LATCH(
- HierarchicalAcquisitionLevel(1), "FifoTicketHolder::WaitingElement::modificationMutex");
+ HierarchicalAcquisitionLevel(0), "FifoTicketHolder::WaitingElement::modificationMutex");
WaitingState state;
};
- MultiProducerMultiConsumerQueue<std::shared_ptr<WaitingElement>> _queue;
+ std::queue<std::shared_ptr<WaitingElement>> _queue;
// _queueMutex protects all modifications made to either the _queue, or the statistics of the
// queue.
Mutex _queueMutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "FifoTicketHolder::_queueMutex");
+ AtomicWord<int> _enqueuedElements;
+ AtomicWord<int> _ticketsAvailable;
};
class ScopedTicket {
diff --git a/src/mongo/util/concurrency/ticketholder_bm.cpp b/src/mongo/util/concurrency/ticketholder_bm.cpp
new file mode 100644
index 00000000000..3ea382ba62e
--- /dev/null
+++ b/src/mongo/util/concurrency/ticketholder_bm.cpp
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <benchmark/benchmark.h>
+
+#include <vector>
+
+#include "mongo/util/concurrency/ticketholder.h"
+
+
+namespace mongo {
+namespace {
+
+static int kTickets = 128;
+static int kThreadMin = 8;
+static int kThreadMax = 1024;
+
+template <class TicketHolderImpl>
+void BM_tryAcquire(benchmark::State& state) {
+ static std::unique_ptr<TicketHolder> ticketHolder;
+ if (state.thread_index == 0) {
+ ticketHolder = std::make_unique<TicketHolderImpl>(kTickets);
+ }
+ double attempted = 0, acquired = 0;
+ for (auto _ : state) {
+ auto hasAcquired = ticketHolder->tryAcquire();
+ state.PauseTiming();
+ sleepmicros(1);
+ attempted++;
+ if (hasAcquired) {
+ acquired++;
+ ticketHolder->release();
+ }
+ state.ResumeTiming();
+ }
+ state.counters["Attempted"] = attempted;
+ state.counters["Acquired"] = acquired;
+}
+
+BENCHMARK_TEMPLATE(BM_tryAcquire, SemaphoreTicketHolder)->ThreadRange(kThreadMin, kThreadMax);
+
+BENCHMARK_TEMPLATE(BM_tryAcquire, FifoTicketHolder)->ThreadRange(kThreadMin, kThreadMax);
+
+template <class TicketHolderImpl>
+void BM_acquire(benchmark::State& state) {
+ static std::unique_ptr<TicketHolder> ticketHolder;
+ if (state.thread_index == 0) {
+ ticketHolder = std::make_unique<TicketHolderImpl>(kTickets);
+ }
+ double acquired = 0;
+ for (auto _ : state) {
+ ticketHolder->waitForTicket();
+ state.PauseTiming();
+ sleepmicros(1);
+ ticketHolder->release();
+ acquired++;
+ state.ResumeTiming();
+ }
+ state.counters["Acquired"] = benchmark::Counter(acquired, benchmark::Counter::kIsRate);
+ state.counters["AcquiredPerThread"] =
+ benchmark::Counter(acquired, benchmark::Counter::kAvgThreadsRate);
+}
+
+BENCHMARK_TEMPLATE(BM_acquire, SemaphoreTicketHolder)->ThreadRange(kThreadMin, kThreadMax);
+
+BENCHMARK_TEMPLATE(BM_acquire, FifoTicketHolder)->ThreadRange(kThreadMin, kThreadMax);
+
+template <class TicketHolderImpl>
+void BM_release(benchmark::State& state) {
+ static std::unique_ptr<TicketHolder> ticketHolder;
+ if (state.thread_index == 0) {
+ ticketHolder = std::make_unique<TicketHolderImpl>(kTickets);
+ }
+ double acquired = 0;
+ for (auto _ : state) {
+ state.PauseTiming();
+ ticketHolder->waitForTicket();
+ sleepmicros(1);
+ state.ResumeTiming();
+ ticketHolder->release();
+ acquired++;
+ }
+ state.counters["Acquired"] = benchmark::Counter(acquired, benchmark::Counter::kIsRate);
+ state.counters["AcquiredPerThread"] =
+ benchmark::Counter(acquired, benchmark::Counter::kAvgThreadsRate);
+}
+
+BENCHMARK_TEMPLATE(BM_release, SemaphoreTicketHolder)->ThreadRange(kThreadMin, kThreadMax);
+
+BENCHMARK_TEMPLATE(BM_release, FifoTicketHolder)->ThreadRange(kThreadMin, kThreadMax);
+
+
+template <class H>
+void BM_acquireAndRelease(benchmark::State& state) {
+ static std::unique_ptr<TicketHolder> ticketHolder;
+ if (state.thread_index == 0) {
+ ticketHolder = std::make_unique<H>(kTickets);
+ }
+ double acquired = 0;
+ for (auto _ : state) {
+ ticketHolder->waitForTicket();
+ state.PauseTiming();
+ sleepmicros(1);
+ state.ResumeTiming();
+ ticketHolder->release();
+ acquired++;
+ }
+ state.counters["Acquired"] = benchmark::Counter(acquired, benchmark::Counter::kIsRate);
+ state.counters["AcquiredPerThread"] =
+ benchmark::Counter(acquired, benchmark::Counter::kAvgThreadsRate);
+}
+
+BENCHMARK_TEMPLATE(BM_acquireAndRelease, SemaphoreTicketHolder)
+ ->ThreadRange(kThreadMin, kThreadMax);
+
+BENCHMARK_TEMPLATE(BM_acquireAndRelease, FifoTicketHolder)->ThreadRange(kThreadMin, kThreadMax);
+
+} // namespace
+} // namespace mongo