summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/ticket_broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/concurrency/ticket_broker.cpp')
-rw-r--r--src/mongo/util/concurrency/ticket_broker.cpp225
1 files changed, 225 insertions, 0 deletions
diff --git a/src/mongo/util/concurrency/ticket_broker.cpp b/src/mongo/util/concurrency/ticket_broker.cpp
new file mode 100644
index 00000000000..a9d428691b0
--- /dev/null
+++ b/src/mongo/util/concurrency/ticket_broker.cpp
@@ -0,0 +1,225 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/util/concurrency/ticket_broker.h"
+#include "mongo/logv2/log.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/errno_util.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
+// TODO SERVER-72616: Remove futex usage from this class in favour of atomic waits.
+#include <linux/futex.h> /* Definition of FUTEX_* constants */
+#include <sys/syscall.h> /* Definition of SYS_* constants */
+#include <unistd.h>
+
+namespace mongo {
+namespace {
+static int futex(uint32_t* uaddr,
+ int futex_op,
+ uint32_t val,
+ const struct timespec* timeout,
+ uint32_t* uaddr2,
+ uint32_t val3) noexcept {
+ return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
+}
+
+// TODO SERVER-72616: This can go away once we're on C++20 and std::atomic<T>::wait exists
+static stdx::cv_status atomic_wait(AtomicWord<uint32_t>& atomic,
+ uint32_t expectedValue,
+ Date_t until) noexcept {
+ while (atomic.load() == expectedValue) {
+ // Prepare the timeout value for the futex call.
+ timespec ts;
+ auto now = Date_t::now();
+ if (now >= until) {
+ return stdx::cv_status::timeout;
+ }
+ auto millis = until - now;
+ ts.tv_sec = millis.count() / 1'000;
+ ts.tv_nsec = (millis.count() % 1'000) * 1'000'000;
+
+ auto futexResult = futex(reinterpret_cast<uint32_t*>(&atomic),
+ FUTEX_WAIT_PRIVATE,
+ expectedValue,
+ &ts,
+ nullptr,
+ 0);
+ if (futexResult != 0) {
+ switch (errno) {
+ // The value has changed before we called futex wait, we treat this as a
+ // notification and exit.
+ case EAGAIN:
+ return stdx::cv_status::no_timeout;
+ case ETIMEDOUT:
+ return stdx::cv_status::timeout;
+ // We ignore signal interruptions as other signals are handled by either crashing
+ // the server or gracefully exiting the server and waiting for operations to finish.
+ case EINTR:
+ break;
+ // All other errors are unrecoverable, fassert and crash the server.
+ default: {
+ LOGV2_FATAL(7206704,
+ "Error in atomic wait for ticket",
+ "error"_attr = errorMessage(posixError(errno)));
+ }
+ }
+ }
+ }
+ return stdx::cv_status::no_timeout;
+}
+
+// TODO SERVER-72616: This can go away once we're on C++20 and std::atomic<T>::notify_one exists.
+static void atomic_notify_one(AtomicWord<uint32_t>& atomic) noexcept {
+ auto result =
+ futex(reinterpret_cast<uint32_t*>(&atomic), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
+ if (result < 0) {
+ // We treat possible errors here as a server crash since we cannot recover from them.
+ LOGV2_FATAL(7206703,
+ "Error in atomic notify for ticket",
+ "error"_attr = errorMessage(posixError(errno)));
+ }
+}
+} // namespace
+
+TicketBroker::TicketBroker() : _queueBegin(nullptr), _queueEnd(nullptr), _numQueued(0) {}
+
+void TicketBroker::_registerAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock,
+ Node& node) noexcept {
+ // We register the node.
+ _numQueued.fetchAndAdd(1);
+
+ if (_queueBegin == nullptr) {
+ // If the list is empty we are the first node.
+ _queueBegin = &node;
+ _queueEnd = &node;
+ } else {
+ // Otherwise we're the new end and must link the preceding node to us, and us to the
+ // preceding node.
+ _queueEnd->next = &node;
+ node.previous = _queueEnd;
+ _queueEnd = &node;
+ }
+}
+
+void TicketBroker::_unregisterAsWaiter(const stdx::unique_lock<stdx::mutex>& growthLock,
+ Node& node) noexcept {
+ // We've been unregistered by a ticket transfer, nothing to do as the transferer already removed
+ // us.
+ if (node.futexWord.loadRelaxed() != 0) {
+ return;
+ }
+
+ auto previousLength = _numQueued.fetchAndSubtract(1);
+ // If there was only 1 node it was us, the queue will now be empty.
+ if (previousLength == 1) {
+ _queueBegin = _queueEnd = nullptr;
+ return;
+ }
+ // If the beginning of the linked list is this node we advance it to the next element.
+ if (_queueBegin == &node) {
+ _queueBegin = node.next;
+ node.next->previous = nullptr;
+ return;
+ }
+
+ // If the end of the queue is this node, then the new end is the preceding node.
+ if (_queueEnd == &node) {
+ _queueEnd = node.previous;
+ node.previous->next = nullptr;
+ return;
+ }
+
+ // Otherwise we're in the middle of the list. Preceding and successive nodes must be updated
+ // accordingly.
+ node.previous->next = node.next;
+ node.next->previous = node.previous;
+}
+
+TicketBroker::WaitingResult TicketBroker::attemptWaitForTicketUntil(
+ stdx::unique_lock<stdx::mutex> growthLock, Date_t until) noexcept {
+ // Stack allocate the node of the linked list, this approach lets us ignore heap memory in
+ // favour of stack memory which is dramatically cheaper. Care must be taken to ensure that there
+ // are no references left in the queue to this node once returning from the method.
+ //
+ // If std::promise didn't perform a heap allocation we could use it here.
+ Node node;
+
+ // We add ourselves as a waiter, we are still holding the lock here.
+ _registerAsWaiter(growthLock, node);
+
+ // Finished modifying the linked list, the lock can be released now.
+ growthLock.unlock();
+
+ // We now wait until obtaining the notification via the futex word.
+ auto waitResult = atomic_wait(node.futexWord, 0, until);
+ bool hasTimedOut = waitResult == stdx::cv_status::timeout;
+
+ if (hasTimedOut) {
+ // Timing out implies that the node must be removed from the list, block list modifications
+ // to prevent segmentation faults.
+ growthLock.lock();
+ _unregisterAsWaiter(growthLock, node);
+ growthLock.unlock();
+ }
+
+ // If we haven't timed out it means that the ticket has been transferred to our node. The
+ // transfer method removes the node from the linked list, so there's no cleanup to be done.
+
+ auto hasTicket = node.futexWord.load() != 0;
+
+ return TicketBroker::WaitingResult{hasTimedOut, hasTicket};
+}
+
+bool TicketBroker::attemptToTransferTicket(
+ const stdx::unique_lock<stdx::mutex>& growthLock) noexcept {
+ // We can only transfer a ticket if there is a thread waiting for it.
+ if (_numQueued.loadRelaxed() > 0) {
+ _numQueued.fetchAndSubtract(1);
+
+ // We notify the first element in the queue. To avoid race conditions we first remove the
+ // node and then notify the waiting thread. Doing the opposite risks a segmentation fault if
+ // the node gets deallocated before we remove it from the list.
+ auto node = _queueBegin;
+ _queueBegin = node->next;
+ if (_queueBegin) {
+ // Next node isn't empty, we must inform it that it's first in line.
+ _queueBegin->previous = nullptr;
+ }
+ auto& futexAtomic = node->futexWord;
+ futexAtomic.store(1);
+ // We've transferred a ticket and removed the node from the list, inform the waiting thread
+ // that it can proceed.
+ atomic_notify_one(futexAtomic);
+ return true;
+ }
+ return false;
+}
+
+} // namespace mongo