diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-03-20 12:54:31 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-06 04:45:54 +0000 |
commit | a29674836e2732727383f3ecbd78f99e790ea1ae (patch) | |
tree | 56cf03f782a3ae49d7293e39fc1f46ef06a045dd | |
parent | 0916dcd9d48917c520553bfab45fcfe5ead85e52 (diff) | |
download | mongo-a29674836e2732727383f3ecbd78f99e790ea1ae.tar.gz |
SERVER-47139 Introduce GuaranteedExecutor class
This commit does the following:
- Introduces the GuaranteedExecutor/GuaranteedExecutorWithFallback class
for when a callback *must* run!
- Centralizes testing Executors in executor_test_util.h!
- Makes a testing suite for InlineCountingExecutor, RejectingExecutor,
and GuaranteedExecutor!
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.h | 30 | ||||
-rw-r--r-- | src/mongo/util/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/util/executor_test_util.h | 94 | ||||
-rw-r--r-- | src/mongo/util/future_test_utils.h | 28 | ||||
-rw-r--r-- | src/mongo/util/out_of_line_executor.h | 151 | ||||
-rw-r--r-- | src/mongo/util/out_of_line_executor_test.cpp | 201 |
7 files changed, 450 insertions, 57 deletions
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 96ec25ba64d..6a85a5a6a76 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -105,7 +105,7 @@ protected: void dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t); private: - std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>(); + std::shared_ptr<OutOfLineExecutor> _executor = InlineCountingExecutor::make(); std::shared_ptr<ConnectionPool> _pool; }; diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index 8fec73b5953..78d14c3e77d 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -32,6 +32,7 @@ #include <set> #include "mongo/executor/connection_pool.h" +#include "mongo/util/executor_test_util.h" #include "mongo/util/functional.h" namespace mongo { @@ -138,35 +139,6 @@ private: }; /** - * An "OutOfLineExecutor" that actually runs on the same thread of execution - */ -class InlineOutOfLineExecutor : public OutOfLineExecutor { -public: - void schedule(Task task) override { - // Add the task to our queue - _taskQueue.emplace_back(std::move(task)); - - // Make sure we're not already inline executing - if (std::exchange(_inSchedule, true)) { - return; - } - - // Clear out our queue - while (!_taskQueue.empty()) { - auto task = std::move(_taskQueue.front()); - std::move(task)(Status::OK()); - _taskQueue.pop_front(); - } - - // Admit we're not working on the queue anymore - _inSchedule = false; - } - - bool _inSchedule; - std::deque<Task> _taskQueue; -}; - -/** * Mock for the pool implementation */ class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface { diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index 46fa8aec9b1..3f3e792c562 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -591,6 +591,7 @@ icuEnv.CppUnitTest( 'lru_cache_test.cpp', 'md5_test.cpp', 'md5main.cpp', + 'out_of_line_executor_test.cpp', 'periodic_runner_impl_test.cpp', 'processinfo_test.cpp', 'procparser_test.cpp' if env.TargetOSIs('linux') else [], diff --git a/src/mongo/util/executor_test_util.h b/src/mongo/util/executor_test_util.h new file mode 100644 index 00000000000..eb64ac85e11 --- /dev/null +++ b/src/mongo/util/executor_test_util.h @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/util/out_of_line_executor.h" + +namespace mongo { +/** + * An "OutOfLineExecutor" that actually runs on the same thread of execution + */ +class InlineCountingExecutor : public OutOfLineExecutor { +public: + void schedule(Task task) override { + // Add the task to our queue + taskQueue.emplace_back(std::move(task)); + + // Make sure that we are not invocing a Task while invocing a Task. Some OutOfLineExecutors + // do recursively dispatch Tasks, however, they also carefully monitor stack depth. For the + // purposes of testing, let's serialize our Tasks. One Task runs at a time. + if (std::exchange(inSchedule, true)) { + return; + } + + ON_BLOCK_EXIT([this] { + // Admit we're not working on the queue anymore + inSchedule = false; + }); + + // Clear out our queue + while (!taskQueue.empty()) { + auto task = std::move(taskQueue.front()); + + // Relaxed to avoid adding synchronization where there otherwise wouldn't be. That would + // cause a false negative from TSAN. + tasksRun.fetch_add(1, std::memory_order_relaxed); + task(Status::OK()); + taskQueue.pop_front(); + } + } + + static auto make() { + return std::make_shared<InlineCountingExecutor>(); + } + + bool inSchedule; + + std::deque<Task> taskQueue; + std::atomic<uint32_t> tasksRun{0}; // NOLINT +}; + +class RejectingExecutor final : public OutOfLineExecutor { +public: + void schedule(Task task) noexcept override { + // Relaxed to avoid adding synchronization where there otherwise wouldn't be. That would + // cause a false negative from TSAN. + tasksRejected.fetch_add(1, std::memory_order_relaxed); + task(Status(ErrorCodes::ShutdownInProgress, "")); + } + + static auto make() { + return std::make_shared<RejectingExecutor>(); + } + + std::atomic<uint32_t> tasksRejected{0}; // NOLINT +}; + +} // namespace mongo diff --git a/src/mongo/util/future_test_utils.h b/src/mongo/util/future_test_utils.h index 6c569a848ed..32aa149a088 100644 --- a/src/mongo/util/future_test_utils.h +++ b/src/mongo/util/future_test_utils.h @@ -34,6 +34,7 @@ #include "mongo/stdx/thread.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/executor_test_util.h" #if !defined(__has_feature) #define __has_feature(x) 0 @@ -50,33 +51,6 @@ enum DoExecutorFuture : bool { kDoExecutorFuture = true, }; -class InlineCountingExecutor final : public OutOfLineExecutor { -public: - void schedule(Task task) noexcept override { - // Relaxed to avoid adding synchronization where there otherwise wouldn't be. That would - // cause a false negative from TSAN. - tasksRun.fetch_add(1, std::memory_order_relaxed); - task(Status::OK()); - } - - static auto make() { - return std::make_shared<InlineCountingExecutor>(); - } - - std::atomic<int32_t> tasksRun{0}; // NOLINT -}; - -class RejectingExecutor final : public OutOfLineExecutor { -public: - void schedule(Task task) noexcept override { - task(Status(ErrorCodes::ShutdownInProgress, "")); - } - - static auto make() { - return std::make_shared<RejectingExecutor>(); - } -}; - class DummyInterruptable final : public Interruptible { StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override { diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h index ff1f4a511a1..a5e7bd42483 100644 --- a/src/mongo/util/out_of_line_executor.h +++ b/src/mongo/util/out_of_line_executor.h @@ -35,6 +35,50 @@ namespace mongo { /** + * RunOnceGuard promises that it its run() function is invoked exactly once. + * + * When a RunOnceGuard is constructed, it marks itself as armed. When a RunOnceGuard is moved from, + * it is marked as done. When the RunOnceGuard is destructed, it invariants that it is finished. + * + * The RunOnceGuard is intended to provide an unsynchronized way to validate that a unit of work was + * actually consumed. It can be bound into lambdas or be constructed as a default member of + * parameter objects in work queues or maps. + */ +class RunOnceGuard { + enum class State { + kDone, + kArmed, + }; + +public: + static constexpr const char kRanNeverStr[] = "Function never ran"; + static constexpr const char kRanTwiceStr[] = "Function ran a second time"; + + constexpr RunOnceGuard() : _state{State::kArmed} {} + ~RunOnceGuard() { + invariant(_state == State::kDone, kRanNeverStr); + } + + RunOnceGuard(RunOnceGuard&& other) : _state{std::exchange(other._state, State::kDone)} {} + RunOnceGuard& operator=(RunOnceGuard&& other) noexcept { + invariant(_state == State::kDone, kRanNeverStr); + _state = std::exchange(other._state, State::kDone); + return *this; + } + + RunOnceGuard(const RunOnceGuard&) = delete; + RunOnceGuard& operator=(const RunOnceGuard&) = delete; + + void run() noexcept { + invariant(_state == State::kArmed, kRanTwiceStr); + _state = State::kDone; + } + +private: + State _state; +}; + +/** * Provides the minimal api for a simple out of line executor that can run non-cancellable * callbacks. * @@ -53,6 +97,9 @@ class OutOfLineExecutor { public: using Task = unique_function<void(Status)>; + static constexpr const char kRejectedWorkStr[] = "OutOfLineExecutor rejected work"; + static constexpr const char kNoExecutorStr[] = "Invalid OutOfLineExecutor provided"; + public: /** * Delegates invocation of the Task to this executor @@ -75,4 +122,108 @@ public: using ExecutorPtr = std::shared_ptr<OutOfLineExecutor>; +/** + * A GuaranteedExecutor is a wrapper that ensures its Tasks run exactly once. + * + * If a Task cannot be run, would be destructed without being run, or would run multiple times, it + * will trigger an invariant. + */ +class GuaranteedExecutor final : public OutOfLineExecutor { +public: + explicit GuaranteedExecutor(ExecutorPtr exec) : _exec(std::move(exec)) { + invariant(_exec, kNoExecutorStr); + } + + virtual ~GuaranteedExecutor() = default; + + /** + * Return a wrapped task that is enforced to run once and only once. + */ + static auto enforceRunOnce(Task&& task) noexcept { + return Task([task = std::move(task), guard = RunOnceGuard()](Status status) mutable { + invariant(status, kRejectedWorkStr); + guard.run(); + + auto localTask = std::exchange(task, {}); + localTask(std::move(status)); + }); + } + + void schedule(Task func) override { + // Make sure that the function will be called eventually, once. + auto sureFunc = enforceRunOnce(std::move(func)); + _exec->schedule(std::move(sureFunc)); + } + +private: + ExecutorPtr _exec; +}; + +/** + * A GuaranteedExecutorWithFallback is a wrapper that allows a preferred Executor to pass tasks to a + * fallback. + * + * The GuaranteedExecutorWithFallback uses its _fallback executor when _preferred invokes a Task + * with a not-okay Status. The _fallback executor is a GuaranteedExecutor wrapper, and thus must run + * Tasks under threat of invariant. + */ +class GuaranteedExecutorWithFallback final : public OutOfLineExecutor { +public: + explicit GuaranteedExecutorWithFallback(ExecutorPtr preferred, ExecutorPtr fallback) + : _preferred(std::move(preferred)), _fallback(std::move(fallback)) { + invariant(_preferred, kNoExecutorStr); + // Fallback invariants via GuaranteedExecutor's constructor. + } + + virtual ~GuaranteedExecutorWithFallback() = default; + + void schedule(Task func) override { + _preferred->schedule([func = std::move(func), fallback = _fallback](Status status) mutable { + if (!status.isOK()) { + // This executor has rejected work, send it to the fallback. + fallback.schedule(std::move(func)); + return; + } + + // This executor has accepted work. + func(std::move(status)); + }); + } + +private: + ExecutorPtr _preferred; + GuaranteedExecutor _fallback; +}; + +/** + * Make a GuaranteedExecutor without a fallback. + * + * If exec is invalid, this function will invariant. + */ +inline ExecutorPtr makeGuaranteedExecutor(ExecutorPtr exec) noexcept { + // Note that each GuaranteedExecutor ctor invariants that the pointer is valid. + return std::make_shared<GuaranteedExecutor>(std::move(exec)); +} + +/** + * Make either a GuaranteedExecutor or a GuaranteedExecutorWithFallback. + * + * If preferred is invalid and fallback is valid, this creates a GuaranteedExecutor from fallback. + * If fallback is invalid and preferred is valid, this creates a GuaranteedExecutor from preferred. + * If both preferred and fallback are invalid, this function will invariant. + */ +inline ExecutorPtr makeGuaranteedExecutor(ExecutorPtr preferred, ExecutorPtr fallback) noexcept { + // Note that each GuaranteedExecutor ctor invariants that the pointer is valid. + if (!preferred) { + return makeGuaranteedExecutor(std::move(fallback)); + } + + if (!fallback) { + return makeGuaranteedExecutor(std::move(preferred)); + } + + return std::make_shared<GuaranteedExecutorWithFallback>(std::move(preferred), + std::move(fallback)); +} + } // namespace mongo diff --git a/src/mongo/util/out_of_line_executor_test.cpp b/src/mongo/util/out_of_line_executor_test.cpp new file mode 100644 index 00000000000..29fa1892720 --- /dev/null +++ b/src/mongo/util/out_of_line_executor_test.cpp @@ -0,0 +1,201 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/util/out_of_line_executor.h" + +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/executor_test_util.h" + +namespace mongo { +namespace { + +TEST(ExecutorTest, RejectingExecutor) { + // Verify that the executor rejects every time and keeps an accurate count. + const auto exec = RejectingExecutor::make(); + + static constexpr size_t kCount = 1000; + for (size_t i = 0; i < kCount; ++i) { + exec->schedule([&](Status error) { + ASSERT_NOT_OK(error); + ASSERT_EQ(exec->tasksRejected.load(), (i + 1)); + }); + } +} + +TEST(ExecutorTest, InlineCountingExecutor) { + // Verify that the executor accepts every time and keeps an accurate count. + const auto execA = InlineCountingExecutor::make(); + const auto execB = InlineCountingExecutor::make(); + + // Using prime numbers so there is no chance of multiple traps + static constexpr size_t kCountA = 1013; + static constexpr size_t kCountB = 1511; + + // Schedule kCountA tasks one at a time. + for (size_t i = 0; i < kCountA; ++i) { + execA->schedule([&](Status status) { + ASSERT_OK(status); + ASSERT_EQ(execA->tasksRun.load(), (i + 1)); + }); + } + + { + // Schedule kCountB tasks recursively. + size_t i = 0; + std::function<void(Status)> recurseExec; + bool inTask = false; + + recurseExec = [&](Status status) { + ASSERT(!std::exchange(inTask, true)); + ASSERT_OK(status); + + auto tasksRun = execB->tasksRun.load(); + ASSERT_EQ(tasksRun, ++i); + if (tasksRun < kCountB) { + execB->schedule(recurseExec); + } + + ASSERT(std::exchange(inTask, false)); + }; + + execB->schedule(recurseExec); + } + + // Verify that running executors together didn't change the expected counts. + ASSERT_EQ(execA->tasksRun.load(), kCountA); + ASSERT_EQ(execB->tasksRun.load(), kCountB); +} + +DEATH_TEST(ExecutorTest, + GuaranteedExecutor_MainInvalid_FallbackInvalid, + GuaranteedExecutor::kNoExecutorStr) { + // If no executor was provided, then we invariant. + const auto gwarExec = makeGuaranteedExecutor({}); +} + +DEATH_TEST(ExecutorTest, + GuaranteedExecutor_MainInvalid_FallbackRejects, + GuaranteedExecutor::kRejectedWorkStr) { + // If we have a fallback and it rejects work, then we invariant. + const auto gwarExec = makeGuaranteedExecutor({}, RejectingExecutor::make()); + gwarExec->schedule([](Status) { FAIL("Nothing should run the actual callback"); }); +} + +TEST(ExecutorTest, GuaranteedExecutor_MainInvalid_FallbackAccepts) { + // If we only have a fallback, then everything runs on it. + const auto countExec = InlineCountingExecutor::make(); + const auto gwarExec = makeGuaranteedExecutor({}, countExec); + + static constexpr size_t kCount = 1000; + for (size_t i = 0; i < kCount; ++i) { + gwarExec->schedule([&](Status status) { ASSERT_OK(status); }); + } + + ASSERT_EQ(countExec->tasksRun.load(), kCount); +} + +DEATH_TEST(ExecutorTest, + GuaranteedExecutor_MainRejects_FallbackInvalid, + GuaranteedExecutor::kRejectedWorkStr) { + // If we only have a main executor and it rejects work, then we invariant. + const auto gwarExec = makeGuaranteedExecutor(RejectingExecutor::make()); + gwarExec->schedule([](Status) { FAIL("Nothing should run the actual callback"); }); +} + +DEATH_TEST(ExecutorTest, + GuaranteedExecutor_MainRejects_FallbackRejects, + GuaranteedExecutor::kRejectedWorkStr) { + // If we have a main and a fallback and both reject work, then we invariant. + const auto gwarExec = + makeGuaranteedExecutor(RejectingExecutor::make(), RejectingExecutor::make()); + gwarExec->schedule([](Status) { FAIL("Nothing should run the actual callback"); }); +} + +TEST(ExecutorTest, GuaranteedExecutor_MainRejects_FallbackAccepts) { + // If the main rejects and the fallback accepts, then run on the fallback. + const auto rejectExec = RejectingExecutor::make(); + const auto countExec = InlineCountingExecutor::make(); + const auto gwarExec = makeGuaranteedExecutor(rejectExec, countExec); + + static constexpr size_t kCount = 1000; + for (size_t i = 0; i < kCount; ++i) { + gwarExec->schedule([&](Status status) { ASSERT_OK(status); }); + } + ASSERT_EQ(rejectExec->tasksRejected.load(), kCount); + ASSERT_EQ(countExec->tasksRun.load(), kCount); +} + +TEST(ExecutorTest, GuaranteedExecutor_MainAccepts_FallbackInvalid) { + // If the main accepts and we don't have a fallback, then run on the main. + const auto countExec = InlineCountingExecutor::make(); + const auto gwarExec = makeGuaranteedExecutor(countExec, {}); + + static constexpr size_t kCount = 1000; + for (size_t i = 0; i < kCount; ++i) { + gwarExec->schedule([&](Status status) { ASSERT_OK(status); }); + } + + ASSERT_EQ(countExec->tasksRun.load(), kCount); +} + +TEST(ExecutorTest, GuaranteedExecutor_MainAccepts_FallbackRejects) { + // If the main accepts and the fallback would reject, then run on the main. + const auto countExec = InlineCountingExecutor::make(); + const auto rejectExec = RejectingExecutor::make(); + const auto gwarExec = makeGuaranteedExecutor(countExec, rejectExec); + + static constexpr size_t kCount = 1000; + for (size_t i = 0; i < kCount; ++i) { + gwarExec->schedule([&](Status status) { ASSERT_OK(status); }); + } + + ASSERT_EQ(countExec->tasksRun.load(), kCount); + ASSERT_EQ(rejectExec->tasksRejected.load(), 0); +} + +TEST(ExecutorTest, GuaranteedExecutor_MainAccepts_FallbackAccepts) { + // If both executor accepts, then run on the main. + const auto countExecA = InlineCountingExecutor::make(); + const auto countExecB = InlineCountingExecutor::make(); + const auto gwarExec = makeGuaranteedExecutor(countExecA, countExecB); + + static constexpr size_t kCount = 1000; + for (size_t i = 0; i < kCount; ++i) { + gwarExec->schedule([&](Status status) { ASSERT_OK(status); }); + } + + ASSERT_EQ(countExecA->tasksRun.load(), kCount); + ASSERT_EQ(countExecB->tasksRun.load(), 0); +} + +} // namespace +} // namespace mongo |