summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBilly Donahue <billy.donahue@mongodb.com>2020-11-20 06:37:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-24 17:48:16 +0000
commit5e82f974f44a822753ad0656692a1011a6611b3c (patch)
tree887c80ef522d7667942690e66524e04557a49578
parentbb840568d976f687fb8e0baa24e37a7827b06de4 (diff)
downloadmongo-5e82f974f44a822753ad0656692a1011a6611b3c.tar.gz
SERVER-53035 ThreadAssertionMonitor
-rw-r--r--src/mongo/unittest/SConscript1
-rw-r--r--src/mongo/unittest/thread_assertion_monitor.h140
-rw-r--r--src/mongo/unittest/thread_assertion_monitor_test.cpp99
-rw-r--r--src/mongo/util/future_test_shared_future.cpp168
4 files changed, 332 insertions, 76 deletions
diff --git a/src/mongo/unittest/SConscript b/src/mongo/unittest/SConscript
index 7b7e09341dd..680eb3ae258 100644
--- a/src/mongo/unittest/SConscript
+++ b/src/mongo/unittest/SConscript
@@ -99,6 +99,7 @@ env.CppUnitTest(
'unittest_test.cpp',
'fixture_test.cpp',
'temp_dir_test.cpp',
+ 'thread_assertion_monitor_test.cpp',
],
)
diff --git a/src/mongo/unittest/thread_assertion_monitor.h b/src/mongo/unittest/thread_assertion_monitor.h
new file mode 100644
index 00000000000..157eb3c4885
--- /dev/null
+++ b/src/mongo/unittest/thread_assertion_monitor.h
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <exception>
+
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo::unittest {
+
+/**
+ * Worker threads cannot use ASSERT exceptions, because they'll normally crash the
+ * process. We provide a way for a worker to transfer ASSERT failures to the
+ * main thread to be rethrown from there.
+ *
+ * The main thread has to be structured such that it's waiting
+ * on this monitor while all other work happens in workers.
+ */
+class ThreadAssertionMonitor {
+
+public:
+ // Monitor will `wait()` on destruction, blocking until a `notifyDone()` call has occurred.
+ ~ThreadAssertionMonitor() noexcept(false) {
+ wait();
+ }
+
+ /** Spawn and return a `stdx::thread` that invokes `f` as if by `exec(f)`. */
+ template <typename F>
+ stdx::thread spawn(F&& f) {
+ return stdx::thread{[this, f = std::move(f)]() mutable { exec(std::move(f)); }};
+ }
+
+ /** Spawn a thread that will invoke monitor.notifyDone()` when it finishes. */
+ template <typename F>
+ stdx::thread spawnController(F&& f) {
+ return spawn([this, f = std::move(f)]() mutable {
+ auto notifyDoneGuard = makeGuard([this] { notifyDone(); });
+ exec(std::move(f));
+ });
+ }
+
+ /** Invokes `f` inside a try/catch that routes any ASSERT failures to the monitor. */
+ template <typename F>
+ void exec(F&& f) {
+ try {
+ std::invoke(std::forward<F>(f));
+ } catch (const unittest::TestAssertionFailureException&) {
+ // Transport ASSERT failures to the monitor.
+ bool notify = false;
+ {
+ stdx::unique_lock lk(_mu); // NOLINT
+ if (!_ex) {
+ _ex = std::current_exception();
+ notify = true;
+ }
+ }
+ if (notify)
+ _cv.notify_one();
+ }
+ }
+
+ void notifyDone() {
+ {
+ stdx::unique_lock lk(_mu);
+ _done = true;
+ }
+ _cv.notify_one();
+ }
+
+ // Blocks until `notifyDone` is called.
+ // Throws if an ASSERT exception was reported by any exec invocation.
+ void wait() {
+ stdx::unique_lock lk(_mu);
+ do {
+ _cv.wait(lk, [&] { return _done || _ex; });
+ if (_ex)
+ std::rethrow_exception(std::exchange(_ex, nullptr));
+ } while (!_done);
+ }
+
+private:
+ stdx::mutex _mu; // NOLINT
+ stdx::condition_variable _cv;
+ std::exception_ptr _ex;
+ bool _done = false;
+};
+
+/**
+ * Covers probably most cases of multithreaded tests.
+ * The body of a test can be passed in as an `f` that
+ * accepts a `unittest::ThreadAssertionMonitor&`.
+ * `f(monitor)` will to become the body of a "controller" thread.
+ *
+ * Any `stdx::thread(...)` constructors in the test must then be
+ * converted to `monitor.spawn(...)` calls. Then the ASSERT
+ * calls in the test will be managed by the monitor and propagate
+ * to be rethrown from the main thread, cleanly failing the unit test.
+ *
+ * The controller thread is joined inside this function,
+ * but user code is still responsible for joining the spawned
+ * worker threads as usual.
+ */
+template <typename F>
+void threadAssertionMonitoredTest(F&& f) {
+ ThreadAssertionMonitor monitor;
+ monitor.spawnController([&, f] { std::invoke(f, monitor); }).join();
+}
+
+} // namespace mongo::unittest
diff --git a/src/mongo/unittest/thread_assertion_monitor_test.cpp b/src/mongo/unittest/thread_assertion_monitor_test.cpp
new file mode 100644
index 00000000000..f20257e2030
--- /dev/null
+++ b/src/mongo/unittest/thread_assertion_monitor_test.cpp
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/unittest/thread_assertion_monitor.h"
+
+#include "mongo/logv2/log.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo::unittest {
+namespace {
+
+TEST(ThreadAssertionMonitor, Trivial) {
+ ThreadAssertionMonitor monitor;
+ monitor.notifyDone(); // Somebody always has to call notifyDone or dtor will wait forever.
+}
+
+TEST(ThreadAssertionMonitor, ControllerInStdxThread) {
+ ThreadAssertionMonitor monitor;
+ stdx::thread{[&] { monitor.notifyDone(); }}.join();
+}
+
+TEST(ThreadAssertionMonitor, OnlyControllerInSpawn) {
+ ThreadAssertionMonitor monitor;
+ monitor.spawn([&] { monitor.notifyDone(); }).join();
+}
+
+TEST(ThreadAssertionMonitor, OnlyControllerInSpawnController) {
+ ThreadAssertionMonitor monitor;
+ monitor.spawnController([] {}).join();
+}
+
+TEST(ThreadAssertionMonitor, WorkerExecOk) {
+ ThreadAssertionMonitor monitor;
+ monitor.spawnController([&] { monitor.spawn([&] { ASSERT_EQ(1, 1) << "Worker ok"; }).join(); })
+ .join();
+}
+
+TEST(ThreadAssertionMonitor, WorkerExecFail) {
+ ThreadAssertionMonitor monitor;
+ monitor.spawnController([&] { monitor.spawn([&] { ASSERT_EQ(1, 2) << "Oops"; }).join(); })
+ .join();
+ try {
+ monitor.wait();
+ FAIL("Expected monitor.wait() to throw");
+ } catch (const TestAssertionFailureException& ex) {
+ ASSERT_STRING_SEARCH_REGEX(ex.what(), "Oops");
+ }
+ LOGV2_INFO(5182100, "monitor.wait finished");
+}
+
+TEST(ThreadAssertionMonitor, ThreadAssertionMonitoredTestTrivial) {
+ threadAssertionMonitoredTest([](auto&) {});
+}
+
+TEST(ThreadAssertionMonitor, ThreadAssertionMonitoredTestPassing) {
+ threadAssertionMonitoredTest([](auto& monitor) { monitor.spawn([&] {}).join(); });
+}
+
+TEST(ThreadAssertionMonitor, ThreadAssertionMonitoredTestFailing) {
+ try {
+ threadAssertionMonitoredTest(
+ [](auto& monitor) { monitor.spawn([&] { ASSERT_EQ(1, 2) << "Oops"; }).join(); });
+ FAIL("Expected threadAssertionMonitoredTest to throw");
+ } catch (const TestAssertionFailureException& ex) {
+ ASSERT_STRING_SEARCH_REGEX(ex.what(), "Oops");
+ }
+}
+
+
+} // namespace
+} // namespace mongo::unittest
diff --git a/src/mongo/util/future_test_shared_future.cpp b/src/mongo/util/future_test_shared_future.cpp
index 656d40424a8..a8cfea9825f 100644
--- a/src/mongo/util/future_test_shared_future.cpp
+++ b/src/mongo/util/future_test_shared_future.cpp
@@ -31,6 +31,7 @@
#include "mongo/util/future.h"
+#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/future_test_utils.h"
@@ -279,95 +280,110 @@ TEST(SharedFuture, InterruptedGet_AddChild_Get) {
});
}
-TEST(SharedFuture, ConcurrentTest_OneSharedFuture) {
- auto nTries = 16;
- while (nTries--) {
- const auto nThreads = 16;
- auto threads = std::vector<stdx::thread>(nThreads);
-
- SharedPromise<void> promise;
-
- auto shared = promise.getFuture();
-
- for (int i = 0; i < nThreads; i++) {
- threads[i] = stdx::thread([i, &shared] {
- auto exec = InlineRecursiveCountingExecutor::make();
- if (i % 5 == 0) {
- // just wait directly on shared.
- shared.get();
- } else if (i % 7 == 0) {
- // interrupted wait, then blocking wait.
- DummyInterruptable dummyInterruptable;
- auto res = shared.waitNoThrow(&dummyInterruptable);
- if (!shared.isReady()) {
- ASSERT_EQ(res, ErrorCodes::Interrupted);
- }
- shared.get();
- } else if (i % 2 == 0) {
- // add a child.
- shared.thenRunOn(exec).then([] {}).get();
- } else {
- // add a grand child.
- shared.thenRunOn(exec).share().thenRunOn(exec).then([] {}).get();
- }
- });
- }
-
- if (nTries % 2 == 0)
- stdx::this_thread::yield(); // Slightly increase the chance of racing.
-
- promise.emplaceValue();
-
- for (auto&& thread : threads) {
- thread.join();
+/** Punt until we have `std::jthread`. Joins itself in destructor. Move-only. */
+class JoinThread : public stdx::thread {
+public:
+ explicit JoinThread(stdx::thread thread) : stdx::thread(std::move(thread)) {}
+ JoinThread(const JoinThread&) = delete;
+ JoinThread& operator=(const JoinThread&) = delete;
+ JoinThread(JoinThread&&) noexcept = default;
+ JoinThread& operator=(JoinThread&&) noexcept = default;
+ ~JoinThread() {
+ if (joinable())
+ join();
+ }
+};
+
+void sharedFutureTestWorker(size_t i, SharedSemiFuture<void>& shared) {
+ auto exec = InlineRecursiveCountingExecutor::make();
+ if (i % 5 == 0) {
+ // just wait directly on shared.
+ shared.get();
+ } else if (i % 7 == 0) {
+ // interrupted wait, then blocking wait.
+ DummyInterruptable dummyInterruptable;
+ auto res = shared.waitNoThrow(&dummyInterruptable);
+ if (!shared.isReady()) {
+ ASSERT_EQ(res, ErrorCodes::Interrupted);
}
+ shared.get();
+ } else if (i % 2 == 0) {
+ // add a child.
+ shared.thenRunOn(exec).then([] {}).get();
+ } else {
+ // add a grand child.
+ shared.thenRunOn(exec).share().thenRunOn(exec).then([] {}).get();
}
}
-TEST(SharedFuture, ConcurrentTest_ManySharedFutures) {
- auto nTries = 16;
- while (nTries--) {
- const auto nThreads = 16;
- auto threads = std::vector<stdx::thread>(nThreads);
+/**
+ * Define a common structure between `ConcurrentTest_OneSharedFuture` and
+ * `ConcurrentTest_ManySharedFutures`. They can vary only in the ways specified
+ * by the `policy` hooks. The `policy` object defines per-try state (returned by
+ * `onTryBegin`), and then per-thread state within each worker thread of each
+ * try (returned by `onThreadBegin`). We want to ensure that the SharedPromise
+ * API works the same whether you make multiple calls to getFuture() or just
+ * one and copy it around.
+ */
+template <typename Policy>
+void sharedFutureConcurrentTest(unittest::ThreadAssertionMonitor& monitor, Policy& policy) {
+ const size_t nTries = 16;
+ for (size_t tryCount = 0; tryCount < nTries; ++tryCount) {
+ const size_t nThreads = 16;
SharedPromise<void> promise;
-
- for (int i = 0; i < nThreads; i++) {
- threads[i] = stdx::thread([i, &promise] {
- auto shared = promise.getFuture();
- auto exec = InlineRecursiveCountingExecutor::make();
-
- if (i % 5 == 0) {
- // just wait directly on shared.
- shared.get();
- } else if (i % 7 == 0) {
- // interrupted wait, then blocking wait.
- DummyInterruptable dummyInterruptable;
- auto res = shared.waitNoThrow(&dummyInterruptable);
- if (!shared.isReady()) {
- ASSERT_EQ(res, ErrorCodes::Interrupted);
- }
- shared.get();
- } else if (i % 2 == 0) {
- // add a child.
- shared.thenRunOn(exec).then([] {}).get();
- } else {
- // add a grand child.
- shared.thenRunOn(exec).share().thenRunOn(exec).then([] {}).get();
- }
- });
+ std::vector<JoinThread> threads;
+
+ auto&& tryState = policy.onTryBegin(promise);
+ for (size_t i = 0; i < nThreads; i++) {
+ threads.push_back(JoinThread{monitor.spawn([&, i] {
+ auto&& shared = policy.onThreadBegin(tryState);
+ sharedFutureTestWorker(i, shared);
+ })});
}
- if (nTries % 2 == 0)
+ if (tryCount % 2 == 0)
stdx::this_thread::yield(); // Slightly increase the chance of racing.
promise.emplaceValue();
-
- for (auto&& thread : threads) {
- thread.join();
- }
}
}
+/**
+ * Make a SharedSemiFuture from the SharedPromise at the beginning of each try.
+ * Use that same object in all of the worker threads.
+ */
+TEST(SharedFuture, ConcurrentTest_OneSharedFuture) {
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ struct {
+ decltype(auto) onTryBegin(SharedPromise<void>& promise) {
+ return promise.getFuture();
+ }
+ decltype(auto) onThreadBegin(SharedSemiFuture<void>& shared) {
+ return shared;
+ }
+ } policy;
+ sharedFutureConcurrentTest(monitor, policy);
+ });
+}
+
+/**
+ * Retain a SharedPromise through all the tries.
+ * Peel multiple SharedSemiFuture from it, one per worker thread.
+ */
+TEST(SharedFuture, ConcurrentTest_ManySharedFutures) {
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ struct {
+ decltype(auto) onTryBegin(SharedPromise<void>& promise) {
+ return promise;
+ }
+ decltype(auto) onThreadBegin(SharedPromise<void>& promise) {
+ return promise.getFuture();
+ }
+ } policy;
+ sharedFutureConcurrentTest(monitor, policy);
+ });
+}
+
} // namespace
} // namespace mongo