diff options
author | Jordi Olivares Provencio <jordi.olivares-provencio@mongodb.com> | 2023-01-10 15:45:46 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-10 17:06:33 +0000 |
commit | 201b01bb23d6051db87b2bcb0be4ad123ba7c99f (patch) | |
tree | 7f4064c77fad024f2458e92306b56cf37c2ad1be /src/mongo/util | |
parent | 4397941a80101eefd7093604a87ce4fce111975e (diff) | |
download | mongo-201b01bb23d6051db87b2bcb0be4ad123ba7c99f.tar.gz |
SERVER-72067 Implement ticket queue mechanism with futex
Diffstat (limited to 'src/mongo/util')
-rw-r--r-- | src/mongo/util/concurrency/SConscript | 18 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticket_broker.cpp | 225 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticket_broker.h | 140 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticket_broker_test.cpp | 167 |
4 files changed, 546 insertions, 4 deletions
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript index de664d85453..22437b73596 100644 --- a/src/mongo/util/concurrency/SConscript +++ b/src/mongo/util/concurrency/SConscript @@ -22,6 +22,17 @@ env.Library( ], ) +# TODO SERVER-72616: This can go away once TicketBroker is implemented in terms of atomic +# wait/notify in C++20. +if env.TargetOSIs('linux'): + env.Library( + target='ticket_broker', + source=['ticket_broker.cpp'], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], + ) + env.Library( target='ticketholder', source=[ @@ -52,10 +63,8 @@ env.Library( env.CppUnitTest( target='util_concurrency_test', source=[ - 'spin_lock_test.cpp', - 'thread_pool_test.cpp', - 'ticketholder_test.cpp', - 'with_lock_test.cpp', + 'spin_lock_test.cpp', 'thread_pool_test.cpp', 'ticketholder_test.cpp', + 'ticket_broker_test.cpp' if env.TargetOSIs('linux') else [], 'with_lock_test.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', @@ -63,6 +72,7 @@ env.CppUnitTest( 'spin_lock', 'thread_pool', 'thread_pool_test_fixture', + 'ticket_broker' if env.TargetOSIs('linux') else [], 'ticketholder', ], ) diff --git a/src/mongo/util/concurrency/ticket_broker.cpp b/src/mongo/util/concurrency/ticket_broker.cpp new file mode 100644 index 00000000000..a9d428691b0 --- /dev/null +++ b/src/mongo/util/concurrency/ticket_broker.cpp @@ -0,0 +1,225 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/util/concurrency/ticket_broker.h" +#include "mongo/logv2/log.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/errno_util.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + +// TODO SERVER-72616: Remove futex usage from this class in favour of atomic waits. +#include <linux/futex.h> /* Definition of FUTEX_* constants */ +#include <sys/syscall.h> /* Definition of SYS_* constants */ +#include <unistd.h> + +namespace mongo { +namespace { +static int futex(uint32_t* uaddr, + int futex_op, + uint32_t val, + const struct timespec* timeout, + uint32_t* uaddr2, + uint32_t val3) noexcept { + return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); +} + +// TODO SERVER-72616: This can go away once we're on C++20 and std::atomic<T>::wait exists +static stdx::cv_status atomic_wait(AtomicWord<uint32_t>& atomic, + uint32_t expectedValue, + Date_t until) noexcept { + while (atomic.load() == expectedValue) { + // Prepare the timeout value for the futex call. + timespec ts; + auto now = Date_t::now(); + if (now >= until) { + return stdx::cv_status::timeout; + } + auto millis = until - now; + ts.tv_sec = millis.count() / 1'000; + ts.tv_nsec = (millis.count() % 1'000) * 1'000'000; + + auto futexResult = futex(reinterpret_cast<uint32_t*>(&atomic), + FUTEX_WAIT_PRIVATE, + expectedValue, + &ts, + nullptr, + 0); + if (futexResult != 0) { + switch (errno) { + // The value has changed before we called futex wait, we treat this as a + // notification and exit. + case EAGAIN: + return stdx::cv_status::no_timeout; + case ETIMEDOUT: + return stdx::cv_status::timeout; + // We ignore signal interruptions as other signals are handled by either crashing + // the server or gracefully exiting the server and waiting for operations to finish. + case EINTR: + break; + // All other errors are unrecoverable, fassert and crash the server. + default: { + LOGV2_FATAL(7206704, + "Error in atomic wait for ticket", + "error"_attr = errorMessage(posixError(errno))); + } + } + } + } + return stdx::cv_status::no_timeout; +} + +// TODO SERVER-72616: This can go away once we're on C++20 and std::atomic<T>::notify_one exists. +static void atomic_notify_one(AtomicWord<uint32_t>& atomic) noexcept { + auto result = + futex(reinterpret_cast<uint32_t*>(&atomic), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0); + if (result < 0) { + // We treat possible errors here as a server crash since we cannot recover from them. + LOGV2_FATAL(7206703, + "Error in atomic notify for ticket", + "error"_attr = errorMessage(posixError(errno))); + } +} +} // namespace + +TicketBroker::TicketBroker() : _queueBegin(nullptr), _queueEnd(nullptr), _numQueued(0) {} + +void TicketBroker::_registerAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, + Node& node) noexcept { + // We register the node. + _numQueued.fetchAndAdd(1); + + if (_queueBegin == nullptr) { + // If the list is empty we are the first node. + _queueBegin = &node; + _queueEnd = &node; + } else { + // Otherwise we're the new end and must link the preceding node to us, and us to the + // preceding node. + _queueEnd->next = &node; + node.previous = _queueEnd; + _queueEnd = &node; + } +} + +void TicketBroker::_unregisterAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, + Node& node) noexcept { + // We've been unregistered by a ticket transfer, nothing to do as the transferer already removed + // us. + if (node.futexWord.loadRelaxed() != 0) { + return; + } + + auto previousLength = _numQueued.fetchAndSubtract(1); + // If there was only 1 node it was us, the queue will now be empty. + if (previousLength == 1) { + _queueBegin = _queueEnd = nullptr; + return; + } + // If the beginning of the linked list is this node we advance it to the next element. + if (_queueBegin == &node) { + _queueBegin = node.next; + node.next->previous = nullptr; + return; + } + + // If the end of the queue is this node, then the new end is the preceding node. + if (_queueEnd == &node) { + _queueEnd = node.previous; + node.previous->next = nullptr; + return; + } + + // Otherwise we're in the middle of the list. Preceding and successive nodes must be updated + // accordingly. + node.previous->next = node.next; + node.next->previous = node.previous; +} + +TicketBroker::WaitingResult TicketBroker::attemptWaitForTicketUntil( + stdx::unique_lock<stdx::mutex> growthLock, Date_t until) noexcept { + // Stack allocate the node of the linked list, this approach lets us ignore heap memory in + // favour of stack memory which is dramatically cheaper. Care must be taken to ensure that there + // are no references left in the queue to this node once returning from the method. + // + // If std::promise didn't perform a heap allocation we could use it here. + Node node; + + // We add ourselves as a waiter, we are still holding the lock here. + _registerAsWaiter(growthLock, node); + + // Finished modifying the linked list, the lock can be released now. + growthLock.unlock(); + + // We now wait until obtaining the notification via the futex word. + auto waitResult = atomic_wait(node.futexWord, 0, until); + bool hasTimedOut = waitResult == stdx::cv_status::timeout; + + if (hasTimedOut) { + // Timing out implies that the node must be removed from the list, block list modifications + // to prevent segmentation faults. + growthLock.lock(); + _unregisterAsWaiter(growthLock, node); + growthLock.unlock(); + } + + // If we haven't timed out it means that the ticket has been transferred to our node. The + // transfer method removes the node from the linked list, so there's no cleanup to be done. + + auto hasTicket = node.futexWord.load() != 0; + + return TicketBroker::WaitingResult{hasTimedOut, hasTicket}; +} + +bool TicketBroker::attemptToTransferTicket( + const stdx::unique_lock<stdx::mutex>& growthLock) noexcept { + // We can only transfer a ticket if there is a thread waiting for it. + if (_numQueued.loadRelaxed() > 0) { + _numQueued.fetchAndSubtract(1); + + // We notify the first element in the queue. To avoid race conditions we first remove the + // node and then notify the waiting thread. Doing the opposite risks a segmentation fault if + // the node gets deallocated before we remove it from the list. + auto node = _queueBegin; + _queueBegin = node->next; + if (_queueBegin) { + // Next node isn't empty, we must inform it that it's first in line. + _queueBegin->previous = nullptr; + } + auto& futexAtomic = node->futexWord; + futexAtomic.store(1); + // We've transferred a ticket and removed the node from the list, inform the waiting thread + // that it can proceed. + atomic_notify_one(futexAtomic); + return true; + } + return false; +} + +} // namespace mongo diff --git a/src/mongo/util/concurrency/ticket_broker.h b/src/mongo/util/concurrency/ticket_broker.h new file mode 100644 index 00000000000..3cc00c0815f --- /dev/null +++ b/src/mongo/util/concurrency/ticket_broker.h @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +/** + * A ticket broker between threads waiting (Ticket-waiters) and threads willing to give a ticket + * (Ticket-releasers). + * + * This broker requires external synchronisation (growthLock) so that it can function correctly. The + * methods that require synchronisation have their signature written so that it enforces correct + * usage. Using it with a different mutex in two separate calls is undefined behaviour and will + * almost certainly cause a deadlock or segmentation fault. + * + * This class is to be used when more than one broker is necessary in a given scope and certain + * guarantees have to be made. Given the following conditions for usage: + * + * - Ticket-waiters must acquire the growthLock in order to enter the broker. + * - Ticket-releasers must acquire the growthLock in order to transfer their ticket to a waiter in + * the broker. + * + * Then this class can provide the following guarantees across the multiple instances of it in + * scope: + * + * - Ticket-releasers can attempt to transfer their ticket to each broker only once. No other waiter + * will appear between attempts. + * - Ticket-waiters will get scheduled for execution once transferred a ticket without having to + * acquire any mutex. + * + * This is useful for example if you need to build a scheduler based on top of multiple brokers + * representing different groups of operations. The ticket-releasers can have "snapshot" guarantees + * of the state of the system and choose who to transfer the ticket to based on some arbitrary + * logic. + * + * Note that the implementation allows granular thread selection for wakeup but we've chosen to use + * FIFO semantics to make the critical section as short lived as possible. + */ +class TicketBroker { +public: + TicketBroker(); + + struct WaitingResult { + bool hasTimedOut; + bool hasTicket; + }; + + /** + * Attempts to wait for a ticket until it reaches the specified timeout. The return type will + * contain whether the attempt was successful and/or if it timed out. + * + * This method consumes the lock since it will internally unlock it. Only locking again if the + * call times out in order to remove the thread from the waiting list. + */ + WaitingResult attemptWaitForTicketUntil(stdx::unique_lock<stdx::mutex> growthLock, + Date_t until) noexcept; + + /** + * Transfers the ticket if there is a thread to transfer it to. Returns true if the ticket was + * transferred. + * + * Guarantee: No modifications to the internal linked list will take place while holding the + * lock. + */ + bool attemptToTransferTicket(const stdx::unique_lock<stdx::mutex>& growthLock) noexcept; + + /** + * Returns the number of threads waiting. + * + * This method is meant for monitoring and tests only. The value is a snapshot of the system at + * the moment of calling the method. It will potentially be out of date as soon as it returns. + * + * This value will be consistent if called while holding the growthLock. + */ + int waitingThreadsRelaxed() const noexcept { + return _numQueued.loadRelaxed(); + } + +private: + /** + * Node structure of the linked list, it has to be a doubly linked list in order to allow + * random removal of nodes. Lifetime of these nodes will reside in the stack memory of the + * thread waiting. + */ + struct Node { + Node* previous{nullptr}; + AtomicWord<uint32_t> futexWord{0}; + Node* next{nullptr}; + }; + + // Append the node to the linked list. + void _registerAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, Node& node) noexcept; + + // Removes the node from the linked list. + void _unregisterAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock, Node& node) noexcept; + + /** + * Edge nodes of the linked list. To append we need to know the end of the list in order to make + * appends O(1) instead of O(n). + */ + Node* _queueBegin; + Node* _queueEnd; + + /** + * Number of queued threads in the linked list. + */ + AtomicWord<int> _numQueued; +}; + +} // namespace mongo diff --git a/src/mongo/util/concurrency/ticket_broker_test.cpp b/src/mongo/util/concurrency/ticket_broker_test.cpp new file mode 100644 index 00000000000..20008a721bb --- /dev/null +++ b/src/mongo/util/concurrency/ticket_broker_test.cpp @@ -0,0 +1,167 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <condition_variable> +#include <mutex> + +#include "mongo/logv2/log.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/barrier.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/ticket_broker.h" +#include "mongo/util/duration.h" +#include "mongo/util/timer.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#define ASSERT_SOON_EXP(exp) \ + if (!(exp)) { \ + LOGV2_WARNING(7206702, \ + "Expression failed, retrying", \ + "exp"_attr = #exp, \ + "file"_attr = __FILE__, \ + "line"_attr = __LINE__); \ + return false; \ + } + +namespace { +using namespace mongo; + +static inline const Seconds kWaitTimeout{20}; +static inline const Milliseconds kSleepTime{1}; + +/** + * Asserts that eventually the predicate does not throw an exception. + */ +#define assertSoon(predicate) \ + { \ + Timer t; \ + while (!predicate()) { \ + if (t.elapsed() >= kWaitTimeout) { \ + LOGV2_ERROR( \ + 7206701, \ + "assertSoon failed, please check the logs for the reason all attempts have " \ + "failed.", \ + "file"_attr = __FILE__, \ + "line"_attr = __LINE__); \ + FAIL("assertSoon failed"); \ + } \ + } \ + } + +TEST(TicketBrokerTest, BasicTimeout) { + TicketBroker broker; + + stdx::mutex brokerMutex; // NOLINT + + { + stdx::unique_lock lk(brokerMutex); + auto result = + broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Milliseconds{10}); + ASSERT_FALSE(result.hasTicket); + ASSERT_TRUE(result.hasTimedOut); + } + + { + stdx::unique_lock lk(brokerMutex); + auto result = broker.attemptWaitForTicketUntil(std::move(lk), Date_t::min()); + ASSERT_FALSE(result.hasTicket); + ASSERT_TRUE(result.hasTimedOut); + } +} + +TEST(TicketBrokerTest, HandOverWorks) { + TicketBroker broker; + + stdx::mutex brokerMutex; // NOLINT + + { + { + stdx::unique_lock growthLock(brokerMutex); + ASSERT_FALSE(broker.attemptToTransferTicket(growthLock)); + } + + stdx::thread waitingThread([&] { + stdx::unique_lock lk(brokerMutex); + auto result = + broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Seconds{10}); + ASSERT_TRUE(result.hasTicket); + ASSERT_FALSE(result.hasTimedOut); + }); + + assertSoon([&] { + ASSERT_SOON_EXP(broker.waitingThreadsRelaxed() == 1); + return true; + }); + + { + stdx::unique_lock growthLock(brokerMutex); + ASSERT_TRUE(broker.attemptToTransferTicket(growthLock)); + } + + waitingThread.join(); + } + + { + static constexpr auto threadsToTest = 10; + AtomicWord<int32_t> pendingThreads{threadsToTest}; + std::vector<stdx::thread> threads; + + for (int i = 0; i < threadsToTest; i++) { + threads.emplace_back([&] { + stdx::unique_lock lk(brokerMutex); + auto result = + broker.attemptWaitForTicketUntil(std::move(lk), Date_t::now() + Seconds{10}); + ASSERT_TRUE(result.hasTicket); + ASSERT_FALSE(result.hasTimedOut); + pendingThreads.subtractAndFetch(1); + }); + } + + assertSoon([&] { + ASSERT_SOON_EXP(broker.waitingThreadsRelaxed() == 10); + return true; + }); + + for (int i = 1; i <= threadsToTest; i++) { + stdx::unique_lock growthLock(brokerMutex); + ASSERT_TRUE(broker.attemptToTransferTicket(growthLock)); + assertSoon([&] { + ASSERT_SOON_EXP(pendingThreads.load() == threadsToTest - i); + return true; + }); + } + + for (auto& thread : threads) { + thread.join(); + } + } +} +} // namespace |