summaryrefslogtreecommitdiff
path: root/src/mongo/util
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2023-01-10 15:45:46 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-10 17:06:33 +0000
commit201b01bb23d6051db87b2bcb0be4ad123ba7c99f (patch)
tree7f4064c77fad024f2458e92306b56cf37c2ad1be /src/mongo/util
parent4397941a80101eefd7093604a87ce4fce111975e (diff)
downloadmongo-201b01bb23d6051db87b2bcb0be4ad123ba7c99f.tar.gz
SERVER-72067 Implement ticket queue mechanism with futex
Diffstat (limited to 'src/mongo/util')
-rw-r--r--src/mongo/util/concurrency/SConscript18
-rw-r--r--src/mongo/util/concurrency/ticket_broker.cpp225
-rw-r--r--src/mongo/util/concurrency/ticket_broker.h140
-rw-r--r--src/mongo/util/concurrency/ticket_broker_test.cpp167
4 files changed, 546 insertions, 4 deletions
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index de664d85453..22437b73596 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -22,6 +22,17 @@ env.Library(
],
)
+# TODO SERVER-72616: This can go away once TicketBroker is implemented in terms of atomic
+# wait/notify in C++20.
+if env.TargetOSIs('linux'):
+ env.Library(
+ target='ticket_broker',
+ source=['ticket_broker.cpp'],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ],
+ )
+
env.Library(
target='ticketholder',
source=[
@@ -52,10 +63,8 @@ env.Library(
env.CppUnitTest(
target='util_concurrency_test',
source=[
- 'spin_lock_test.cpp',
- 'thread_pool_test.cpp',
- 'ticketholder_test.cpp',
- 'with_lock_test.cpp',
+ 'spin_lock_test.cpp', 'thread_pool_test.cpp', 'ticketholder_test.cpp',
+ 'ticket_broker_test.cpp' if env.TargetOSIs('linux') else [], 'with_lock_test.cpp'
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
@@ -63,6 +72,7 @@ env.CppUnitTest(
'spin_lock',
'thread_pool',
'thread_pool_test_fixture',
+ 'ticket_broker' if env.TargetOSIs('linux') else [],
'ticketholder',
],
)
diff --git a/src/mongo/util/concurrency/ticket_broker.cpp b/src/mongo/util/concurrency/ticket_broker.cpp
new file mode 100644
index 00000000000..a9d428691b0
--- /dev/null
+++ b/src/mongo/util/concurrency/ticket_broker.cpp
@@ -0,0 +1,225 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/util/concurrency/ticket_broker.h"
+#include "mongo/logv2/log.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/errno_util.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
+// TODO SERVER-72616: Remove futex usage from this class in favour of atomic waits.
+#include <linux/futex.h> /* Definition of FUTEX_* constants */
+#include <sys/syscall.h> /* Definition of SYS_* constants */
+#include <unistd.h>
+
+namespace mongo {
+namespace {
+static int futex(uint32_t* uaddr,
+ int futex_op,
+ uint32_t val,
+ const struct timespec* timeout,
+ uint32_t* uaddr2,
+ uint32_t val3) noexcept {
+ return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
+}
+
+// TODO SERVER-72616: This can go away once we're on C++20 and std::atomic<T>::wait exists
+static stdx::cv_status atomic_wait(AtomicWord<uint32_t>& atomic,
+ uint32_t expectedValue,
+ Date_t until) noexcept {
+ while (atomic.load() == expectedValue) {
+ // Prepare the timeout value for the futex call.
+ timespec ts;
+ auto now = Date_t::now();
+ if (now >= until) {
+ return stdx::cv_status::timeout;
+ }
+ auto millis = until - now;
+ ts.tv_sec = millis.count() / 1'000;
+ ts.tv_nsec = (millis.count() % 1'000) * 1'000'000;
+
+ auto futexResult = futex(reinterpret_cast<uint32_t*>(&atomic),
+ FUTEX_WAIT_PRIVATE,
+ expectedValue,
+ &ts,
+ nullptr,
+ 0);
+ if (futexResult != 0) {
+ switch (errno) {
+ // The value has changed before we called futex wait, we treat this as a
+ // notification and exit.
+ case EAGAIN:
+ return stdx::cv_status::no_timeout;
+ case ETIMEDOUT:
+ return stdx::cv_status::timeout;
+ // We ignore signal interruptions as other signals are handled by either crashing
+ // the server or gracefully exiting the server and waiting for operations to finish.
+ case EINTR:
+ break;
+ // All other errors are unrecoverable, fassert and crash the server.
+ default: {
+ LOGV2_FATAL(7206704,
+ "Error in atomic wait for ticket",
+ "error"_attr = errorMessage(posixError(errno)));
+ }
+ }
+ }
+ }
+ return stdx::cv_status::no_timeout;
+}
+
+// TODO SERVER-72616: This can go away once we're on C++20 and std::atomic<T>::notify_one exists.
+static void atomic_notify_one(AtomicWord<uint32_t>& atomic) noexcept {
+ auto result =
+ futex(reinterpret_cast<uint32_t*>(&atomic), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
+ if (result < 0) {
+ // We treat possible errors here as a server crash since we cannot recover from them.
+ LOGV2_FATAL(7206703,
+ "Error in atomic notify for ticket",
+ "error"_attr = errorMessage(posixError(errno)));
+ }
+}
+} // namespace
+
+TicketBroker::TicketBroker() : _queueBegin(nullptr), _queueEnd(nullptr), _numQueued(0) {}
+
+void TicketBroker::_registerAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock,
+ Node& node) noexcept {
+ // We register the node.
+ _numQueued.fetchAndAdd(1);
+
+ if (_queueBegin == nullptr) {
+ // If the list is empty we are the first node.
+ _queueBegin = &node;
+ _queueEnd = &node;
+ } else {
+ // Otherwise we're the new end and must link the preceding node to us, and us to the
+ // preceding node.
+ _queueEnd->next = &node;
+ node.previous = _queueEnd;
+ _queueEnd = &node;
+ }
+}
+
+void TicketBroker::_unregisterAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock,
+ Node& node) noexcept {
+ // We've been unregistered by a ticket transfer, nothing to do as the transferer already removed
+ // us.
+ if (node.futexWord.loadRelaxed() != 0) {
+ return;
+ }
+
+ auto previousLength = _numQueued.fetchAndSubtract(1);
+ // If there was only 1 node it was us, the queue will now be empty.
+ if (previousLength == 1) {
+ _queueBegin = _queueEnd = nullptr;
+ return;
+ }
+ // If the beginning of the linked list is this node we advance it to the next element.
+ if (_queueBegin == &node) {
+ _queueBegin = node.next;
+ node.next->previous = nullptr;
+ return;
+ }
+
+ // If the end of the queue is this node, then the new end is the preceding node.
+ if (_queueEnd == &node) {
+ _queueEnd = node.previous;
+ node.previous->next = nullptr;
+ return;
+ }
+
+ // Otherwise we're in the middle of the list. Preceding and successive nodes must be updated
+ // accordingly.
+ node.previous->next = node.next;
+ node.next->previous = node.previous;
+}
+
+TicketBroker::WaitingResult TicketBroker::attemptWaitForTicketUntil(
+ stdx::unique_lock<stdx::mutex> growthLock, Date_t until) noexcept {
+ // Stack allocate the node of the linked list, this approach lets us ignore heap memory in
+ // favour of stack memory which is dramatically cheaper. Care must be taken to ensure that there
+ // are no references left in the queue to this node once returning from the method.
+ //
+ // If std::promise didn't perform a heap allocation we could use it here.
+ Node node;
+
+ // We add ourselves as a waiter, we are still holding the lock here.
+ _registerAsWaiter(growthLock, node);
+
+ // Finished modifying the linked list, the lock can be released now.
+ growthLock.unlock();
+
+ // We now wait until obtaining the notification via the futex word.
+ auto waitResult = atomic_wait(node.futexWord, 0, until);
+ bool hasTimedOut = waitResult == stdx::cv_status::timeout;
+
+ if (hasTimedOut) {
+ // Timing out implies that the node must be removed from the list, block list modifications
+ // to prevent segmentation faults.
+ growthLock.lock();
+ _unregisterAsWaiter(growthLock, node);
+ growthLock.unlock();
+ }
+
+ // If we haven't timed out it means that the ticket has been transferred to our node. The
+ // transfer method removes the node from the linked list, so there's no cleanup to be done.
+
+ auto hasTicket = node.futexWord.load() != 0;
+
+ return TicketBroker::WaitingResult{hasTimedOut, hasTicket};
+}
+
+bool TicketBroker::attemptToTransferTicket(
+ const stdx::unique_lock<stdx::mutex>& growthLock) noexcept {
+ // We can only transfer a ticket if there is a thread waiting for it.
+ if (_numQueued.loadRelaxed() > 0) {
+ _numQueued.fetchAndSubtract(1);
+
+ // We notify the first element in the queue. To avoid race conditions we first remove the
+ // node and then notify the waiting thread. Doing the opposite risks a segmentation fault if
+ // the node gets deallocated before we remove it from the list.
+ auto node = _queueBegin;
+ _queueBegin = node->next;
+ if (_queueBegin) {
+ // Next node isn't empty, we must inform it that it's first in line.
+ _queueBegin->previous = nullptr;
+ }
+ auto& futexAtomic = node->futexWord;
+ futexAtomic.store(1);
+ // We've transferred a ticket and removed the node from the list, inform the waiting thread
+ // that it can proceed.
+ atomic_notify_one(futexAtomic);
+ return true;
+ }
+ return false;
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticket_broker.h b/src/mongo/util/concurrency/ticket_broker.h
new file mode 100644
index 00000000000..3cc00c0815f
--- /dev/null
+++ b/src/mongo/util/concurrency/ticket_broker.h
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+/**
+ * A ticket broker between threads waiting (Ticket-waiters) and threads willing to give a ticket
+ * (Ticket-releasers).
+ *
+ * This broker requires external synchronisation (growthLock) so that it can function correctly. The
+ * methods that require synchronisation have their signature written so that it enforces correct
+ * usage. Using it with a different mutex in two separate calls is undefined behaviour and will
+ * almost certainly cause a deadlock or segmentation fault.
+ *
+ * This class is to be used when more than one broker is necessary in a given scope and certain
+ * guarantees have to be made. Given the following conditions for usage:
+ *
+ * - Ticket-waiters must acquire the growthLock in order to enter the broker.
+ * - Ticket-releasers must acquire the growthLock in order to transfer their ticket to a waiter in
+ * the broker.
+ *
+ * Then this class can provide the following guarantees across the multiple instances of it in
+ * scope:
+ *
+ * - Ticket-releasers can attempt to transfer their ticket to each broker only once. No other waiter
+ * will appear between attempts.
+ * - Ticket-waiters will get scheduled for execution once transferred a ticket without having to
+ * acquire any mutex.
+ *
+ * This is useful for example if you need to build a scheduler based on top of multiple brokers
+ * representing different groups of operations. The ticket-releasers can have "snapshot" guarantees
+ * of the state of the system and choose who to transfer the ticket to based on some arbitrary
+ * logic.
+ *
+ * Note that the implementation allows granular thread selection for wakeup but we've chosen to use
+ * FIFO semantics to make the critical section as short lived as possible.
+ */
+class TicketBroker {
+public:
+ TicketBroker();
+
+ struct WaitingResult {
+ bool hasTimedOut;
+ bool hasTicket;
+ };
+
+ /**
+ * Attempts to wait for a ticket until it reaches the specified timeout. The return type will
+ * contain whether the attempt was successful and/or if it timed out.
+ *
+ * This method consumes the lock since it will internally unlock it. Only locking again if the
+ * call times out in order to remove the thread from the waiting list.
+ */
+ WaitingResult attemptWaitForTicketUntil(stdx::unique_lock<stdx::mutex> growthLock,
+ Date_t until) noexcept;
+
+ /**
+ * Transfers the ticket if there is a thread to transfer it to. Returns true if the ticket was
+ * transferred.
+ *
+ * Guarantee: No modifications to the internal linked list will take place while holding the
+ * lock.
+ */
+ bool attemptToTransferTicket(const stdx::unique_lock<stdx::mutex>& growthLock) noexcept;
+
+ /**
+ * Returns the number of threads waiting.
+ *
+ * This method is meant for monitoring and tests only. The value is a snapshot of the system at
+ * the moment of calling the method. It will potentially be out of date as soon as it returns.
+ *
+ * This value will be consistent if called while holding the growthLock.
+ */
+ int waitingThreadsRelaxed() const noexcept {
+ return _numQueued.loadRelaxed();
+ }
+
+private:
+ /**
+ * Node structure of the linked list, it has to be a doubly linked list in order to allow
+ * random removal of nodes. Lifetime of these nodes will reside in the stack memory of the
+ * thread waiting.
+ */
+ struct Node {
+ Node* previous{nullptr};
+ AtomicWord<uint32_t> futexWord{0};
+ Node* next{nullptr};
+ };
+
+ // Append the node to the linked list.
+ void _registerAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, Node& node) noexcept;
+
+ // Removes the node from the linked list.
+ void _unregisterAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, Node& node) noexcept;
+
+ /**
+ * Edge nodes of the linked list. To append we need to know the end of the list in order to make
+ * appends O(1) instead of O(n).
+ */
+ Node* _queueBegin;
+ Node* _queueEnd;
+
+ /**
+ * Number of queued threads in the linked list.
+ */
+ AtomicWord<int> _numQueued;
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticket_broker_test.cpp b/src/mongo/util/concurrency/ticket_broker_test.cpp
new file mode 100644
index 00000000000..20008a721bb
--- /dev/null
+++ b/src/mongo/util/concurrency/ticket_broker_test.cpp
@@ -0,0 +1,167 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <condition_variable>
+#include <mutex>
+
+#include "mongo/logv2/log.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/barrier.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/ticket_broker.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/timer.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#define ASSERT_SOON_EXP(exp) \
+ if (!(exp)) { \
+ LOGV2_WARNING(7206702, \
+ "Expression failed, retrying", \
+ "exp"_attr = #exp, \
+ "file"_attr = __FILE__, \
+ "line"_attr = __LINE__); \
+ return false; \
+ }
+
+namespace {
+using namespace mongo;
+
+static inline const Seconds kWaitTimeout{20};
+static inline const Milliseconds kSleepTime{1};
+
+/**
+ * Asserts that eventually the predicate does not throw an exception.
+ */
+#define assertSoon(predicate) \
+ { \
+ Timer t; \
+ while (!predicate()) { \
+ if (t.elapsed() >= kWaitTimeout) { \
+ LOGV2_ERROR( \
+ 7206701, \
+ "assertSoon failed, please check the logs for the reason all attempts have " \
+ "failed.", \
+ "file"_attr = __FILE__, \
+ "line"_attr = __LINE__); \
+ FAIL("assertSoon failed"); \
+ } \
+ } \
+ }
+
+TEST(TicketBrokerTest, BasicTimeout) {
+ TicketBroker broker;
+
+ stdx::mutex brokerMutex; // NOLINT
+
+ {
+ stdx::unique_lock lk(brokerMutex);
+ auto result =
+ broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Milliseconds{10});
+ ASSERT_FALSE(result.hasTicket);
+ ASSERT_TRUE(result.hasTimedOut);
+ }
+
+ {
+ stdx::unique_lock lk(brokerMutex);
+ auto result = broker.attemptWaitForTicketUntil(std::move(lk), Date_t::min());
+ ASSERT_FALSE(result.hasTicket);
+ ASSERT_TRUE(result.hasTimedOut);
+ }
+}
+
+TEST(TicketBrokerTest, HandOverWorks) {
+ TicketBroker broker;
+
+ stdx::mutex brokerMutex; // NOLINT
+
+ {
+ {
+ stdx::unique_lock growthLock(brokerMutex);
+ ASSERT_FALSE(broker.attemptToTransferTicket(growthLock));
+ }
+
+ stdx::thread waitingThread([&] {
+ stdx::unique_lock lk(brokerMutex);
+ auto result =
+ broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Seconds{10});
+ ASSERT_TRUE(result.hasTicket);
+ ASSERT_FALSE(result.hasTimedOut);
+ });
+
+ assertSoon([&] {
+ ASSERT_SOON_EXP(broker.waitingThreadsRelaxed() == 1);
+ return true;
+ });
+
+ {
+ stdx::unique_lock growthLock(brokerMutex);
+ ASSERT_TRUE(broker.attemptToTransferTicket(growthLock));
+ }
+
+ waitingThread.join();
+ }
+
+ {
+ static constexpr auto threadsToTest = 10;
+ AtomicWord<int32_t> pendingThreads{threadsToTest};
+ std::vector<stdx::thread> threads;
+
+ for (int i = 0; i < threadsToTest; i++) {
+ threads.emplace_back([&] {
+ stdx::unique_lock lk(brokerMutex);
+ auto result =
+ broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Seconds{10});
+ ASSERT_TRUE(result.hasTicket);
+ ASSERT_FALSE(result.hasTimedOut);
+ pendingThreads.subtractAndFetch(1);
+ });
+ }
+
+ assertSoon([&] {
+ ASSERT_SOON_EXP(broker.waitingThreadsRelaxed() == 10);
+ return true;
+ });
+
+ for (int i = 1; i <= threadsToTest; i++) {
+ stdx::unique_lock growthLock(brokerMutex);
+ ASSERT_TRUE(broker.attemptToTransferTicket(growthLock));
+ assertSoon([&] {
+ ASSERT_SOON_EXP(pendingThreads.load() == threadsToTest - i);
+ return true;
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ }
+}
+} // namespace