summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-03-20 12:54:31 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-26 22:38:40 +0000
commit491ab3f67681e83f4184f4ffce07c6c53d9441d9 (patch)
treedaefd85ae4771d9b8df8799d18118852bc74a2ab
parent12e5e201689e082f1b3a43f14bc2adccf4f75875 (diff)
downloadmongo-491ab3f67681e83f4184f4ffce07c6c53d9441d9.tar.gz
SERVER-47139 Introduce GuaranteedExecutor class
This commit does the following: - Introduces the GuaranteedExecutor/GuaranteedExecutorWithFallback class for when a callback *must* run! - Centralizes testing Executors in executor_test_util.h! - Makes a testing suite for InlineCountingExecutor, RejectingExecutor, and GuaranteedExecutor!
-rw-r--r--src/mongo/executor/connection_pool_test.cpp2
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.h30
-rw-r--r--src/mongo/util/SConscript1
-rw-r--r--src/mongo/util/executor_test_util.h94
-rw-r--r--src/mongo/util/future_test_utils.h28
-rw-r--r--src/mongo/util/out_of_line_executor.h151
-rw-r--r--src/mongo/util/out_of_line_executor_test.cpp201
7 files changed, 450 insertions, 57 deletions
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 96ec25ba64d..6a85a5a6a76 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -105,7 +105,7 @@ protected:
void dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t);
private:
- std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>();
+ std::shared_ptr<OutOfLineExecutor> _executor = InlineCountingExecutor::make();
std::shared_ptr<ConnectionPool> _pool;
};
diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h
index 8fec73b5953..78d14c3e77d 100644
--- a/src/mongo/executor/connection_pool_test_fixture.h
+++ b/src/mongo/executor/connection_pool_test_fixture.h
@@ -32,6 +32,7 @@
#include <set>
#include "mongo/executor/connection_pool.h"
+#include "mongo/util/executor_test_util.h"
#include "mongo/util/functional.h"
namespace mongo {
@@ -138,35 +139,6 @@ private:
};
/**
- * An "OutOfLineExecutor" that actually runs on the same thread of execution
- */
-class InlineOutOfLineExecutor : public OutOfLineExecutor {
-public:
- void schedule(Task task) override {
- // Add the task to our queue
- _taskQueue.emplace_back(std::move(task));
-
- // Make sure we're not already inline executing
- if (std::exchange(_inSchedule, true)) {
- return;
- }
-
- // Clear out our queue
- while (!_taskQueue.empty()) {
- auto task = std::move(_taskQueue.front());
- std::move(task)(Status::OK());
- _taskQueue.pop_front();
- }
-
- // Admit we're not working on the queue anymore
- _inSchedule = false;
- }
-
- bool _inSchedule;
- std::deque<Task> _taskQueue;
-};
-
-/**
* Mock for the pool implementation
*/
class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface {
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index 46fa8aec9b1..3f3e792c562 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -591,6 +591,7 @@ icuEnv.CppUnitTest(
'lru_cache_test.cpp',
'md5_test.cpp',
'md5main.cpp',
+ 'out_of_line_executor_test.cpp',
'periodic_runner_impl_test.cpp',
'processinfo_test.cpp',
'procparser_test.cpp' if env.TargetOSIs('linux') else [],
diff --git a/src/mongo/util/executor_test_util.h b/src/mongo/util/executor_test_util.h
new file mode 100644
index 00000000000..eb64ac85e11
--- /dev/null
+++ b/src/mongo/util/executor_test_util.h
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/util/out_of_line_executor.h"
+
+namespace mongo {
+/**
+ * An "OutOfLineExecutor" that actually runs on the same thread of execution
+ */
+class InlineCountingExecutor : public OutOfLineExecutor {
+public:
+ void schedule(Task task) override {
+ // Add the task to our queue
+ taskQueue.emplace_back(std::move(task));
+
+ // Make sure that we are not invocing a Task while invocing a Task. Some OutOfLineExecutors
+ // do recursively dispatch Tasks, however, they also carefully monitor stack depth. For the
+ // purposes of testing, let's serialize our Tasks. One Task runs at a time.
+ if (std::exchange(inSchedule, true)) {
+ return;
+ }
+
+ ON_BLOCK_EXIT([this] {
+ // Admit we're not working on the queue anymore
+ inSchedule = false;
+ });
+
+ // Clear out our queue
+ while (!taskQueue.empty()) {
+ auto task = std::move(taskQueue.front());
+
+ // Relaxed to avoid adding synchronization where there otherwise wouldn't be. That would
+ // cause a false negative from TSAN.
+ tasksRun.fetch_add(1, std::memory_order_relaxed);
+ task(Status::OK());
+ taskQueue.pop_front();
+ }
+ }
+
+ static auto make() {
+ return std::make_shared<InlineCountingExecutor>();
+ }
+
+ bool inSchedule;
+
+ std::deque<Task> taskQueue;
+ std::atomic<uint32_t> tasksRun{0}; // NOLINT
+};
+
+class RejectingExecutor final : public OutOfLineExecutor {
+public:
+ void schedule(Task task) noexcept override {
+ // Relaxed to avoid adding synchronization where there otherwise wouldn't be. That would
+ // cause a false negative from TSAN.
+ tasksRejected.fetch_add(1, std::memory_order_relaxed);
+ task(Status(ErrorCodes::ShutdownInProgress, ""));
+ }
+
+ static auto make() {
+ return std::make_shared<RejectingExecutor>();
+ }
+
+ std::atomic<uint32_t> tasksRejected{0}; // NOLINT
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/future_test_utils.h b/src/mongo/util/future_test_utils.h
index 6c569a848ed..32aa149a088 100644
--- a/src/mongo/util/future_test_utils.h
+++ b/src/mongo/util/future_test_utils.h
@@ -34,6 +34,7 @@
#include "mongo/stdx/thread.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/executor_test_util.h"
#if !defined(__has_feature)
#define __has_feature(x) 0
@@ -50,33 +51,6 @@ enum DoExecutorFuture : bool {
kDoExecutorFuture = true,
};
-class InlineCountingExecutor final : public OutOfLineExecutor {
-public:
- void schedule(Task task) noexcept override {
- // Relaxed to avoid adding synchronization where there otherwise wouldn't be. That would
- // cause a false negative from TSAN.
- tasksRun.fetch_add(1, std::memory_order_relaxed);
- task(Status::OK());
- }
-
- static auto make() {
- return std::make_shared<InlineCountingExecutor>();
- }
-
- std::atomic<int32_t> tasksRun{0}; // NOLINT
-};
-
-class RejectingExecutor final : public OutOfLineExecutor {
-public:
- void schedule(Task task) noexcept override {
- task(Status(ErrorCodes::ShutdownInProgress, ""));
- }
-
- static auto make() {
- return std::make_shared<RejectingExecutor>();
- }
-};
-
class DummyInterruptable final : public Interruptible {
StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override {
diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h
index ff1f4a511a1..a5e7bd42483 100644
--- a/src/mongo/util/out_of_line_executor.h
+++ b/src/mongo/util/out_of_line_executor.h
@@ -35,6 +35,50 @@
namespace mongo {
/**
+ * RunOnceGuard promises that it its run() function is invoked exactly once.
+ *
+ * When a RunOnceGuard is constructed, it marks itself as armed. When a RunOnceGuard is moved from,
+ * it is marked as done. When the RunOnceGuard is destructed, it invariants that it is finished.
+ *
+ * The RunOnceGuard is intended to provide an unsynchronized way to validate that a unit of work was
+ * actually consumed. It can be bound into lambdas or be constructed as a default member of
+ * parameter objects in work queues or maps.
+ */
+class RunOnceGuard {
+ enum class State {
+ kDone,
+ kArmed,
+ };
+
+public:
+ static constexpr const char kRanNeverStr[] = "Function never ran";
+ static constexpr const char kRanTwiceStr[] = "Function ran a second time";
+
+ constexpr RunOnceGuard() : _state{State::kArmed} {}
+ ~RunOnceGuard() {
+ invariant(_state == State::kDone, kRanNeverStr);
+ }
+
+ RunOnceGuard(RunOnceGuard&& other) : _state{std::exchange(other._state, State::kDone)} {}
+ RunOnceGuard& operator=(RunOnceGuard&& other) noexcept {
+ invariant(_state == State::kDone, kRanNeverStr);
+ _state = std::exchange(other._state, State::kDone);
+ return *this;
+ }
+
+ RunOnceGuard(const RunOnceGuard&) = delete;
+ RunOnceGuard& operator=(const RunOnceGuard&) = delete;
+
+ void run() noexcept {
+ invariant(_state == State::kArmed, kRanTwiceStr);
+ _state = State::kDone;
+ }
+
+private:
+ State _state;
+};
+
+/**
* Provides the minimal api for a simple out of line executor that can run non-cancellable
* callbacks.
*
@@ -53,6 +97,9 @@ class OutOfLineExecutor {
public:
using Task = unique_function<void(Status)>;
+ static constexpr const char kRejectedWorkStr[] = "OutOfLineExecutor rejected work";
+ static constexpr const char kNoExecutorStr[] = "Invalid OutOfLineExecutor provided";
+
public:
/**
* Delegates invocation of the Task to this executor
@@ -75,4 +122,108 @@ public:
using ExecutorPtr = std::shared_ptr<OutOfLineExecutor>;
+/**
+ * A GuaranteedExecutor is a wrapper that ensures its Tasks run exactly once.
+ *
+ * If a Task cannot be run, would be destructed without being run, or would run multiple times, it
+ * will trigger an invariant.
+ */
+class GuaranteedExecutor final : public OutOfLineExecutor {
+public:
+ explicit GuaranteedExecutor(ExecutorPtr exec) : _exec(std::move(exec)) {
+ invariant(_exec, kNoExecutorStr);
+ }
+
+ virtual ~GuaranteedExecutor() = default;
+
+ /**
+ * Return a wrapped task that is enforced to run once and only once.
+ */
+ static auto enforceRunOnce(Task&& task) noexcept {
+ return Task([task = std::move(task), guard = RunOnceGuard()](Status status) mutable {
+ invariant(status, kRejectedWorkStr);
+ guard.run();
+
+ auto localTask = std::exchange(task, {});
+ localTask(std::move(status));
+ });
+ }
+
+ void schedule(Task func) override {
+ // Make sure that the function will be called eventually, once.
+ auto sureFunc = enforceRunOnce(std::move(func));
+ _exec->schedule(std::move(sureFunc));
+ }
+
+private:
+ ExecutorPtr _exec;
+};
+
+/**
+ * A GuaranteedExecutorWithFallback is a wrapper that allows a preferred Executor to pass tasks to a
+ * fallback.
+ *
+ * The GuaranteedExecutorWithFallback uses its _fallback executor when _preferred invokes a Task
+ * with a not-okay Status. The _fallback executor is a GuaranteedExecutor wrapper, and thus must run
+ * Tasks under threat of invariant.
+ */
+class GuaranteedExecutorWithFallback final : public OutOfLineExecutor {
+public:
+ explicit GuaranteedExecutorWithFallback(ExecutorPtr preferred, ExecutorPtr fallback)
+ : _preferred(std::move(preferred)), _fallback(std::move(fallback)) {
+ invariant(_preferred, kNoExecutorStr);
+ // Fallback invariants via GuaranteedExecutor's constructor.
+ }
+
+ virtual ~GuaranteedExecutorWithFallback() = default;
+
+ void schedule(Task func) override {
+ _preferred->schedule([func = std::move(func), fallback = _fallback](Status status) mutable {
+ if (!status.isOK()) {
+ // This executor has rejected work, send it to the fallback.
+ fallback.schedule(std::move(func));
+ return;
+ }
+
+ // This executor has accepted work.
+ func(std::move(status));
+ });
+ }
+
+private:
+ ExecutorPtr _preferred;
+ GuaranteedExecutor _fallback;
+};
+
+/**
+ * Make a GuaranteedExecutor without a fallback.
+ *
+ * If exec is invalid, this function will invariant.
+ */
+inline ExecutorPtr makeGuaranteedExecutor(ExecutorPtr exec) noexcept {
+ // Note that each GuaranteedExecutor ctor invariants that the pointer is valid.
+ return std::make_shared<GuaranteedExecutor>(std::move(exec));
+}
+
+/**
+ * Make either a GuaranteedExecutor or a GuaranteedExecutorWithFallback.
+ *
+ * If preferred is invalid and fallback is valid, this creates a GuaranteedExecutor from fallback.
+ * If fallback is invalid and preferred is valid, this creates a GuaranteedExecutor from preferred.
+ * If both preferred and fallback are invalid, this function will invariant.
+ */
+inline ExecutorPtr makeGuaranteedExecutor(ExecutorPtr preferred, ExecutorPtr fallback) noexcept {
+ // Note that each GuaranteedExecutor ctor invariants that the pointer is valid.
+ if (!preferred) {
+ return makeGuaranteedExecutor(std::move(fallback));
+ }
+
+ if (!fallback) {
+ return makeGuaranteedExecutor(std::move(preferred));
+ }
+
+ return std::make_shared<GuaranteedExecutorWithFallback>(std::move(preferred),
+ std::move(fallback));
+}
+
} // namespace mongo
diff --git a/src/mongo/util/out_of_line_executor_test.cpp b/src/mongo/util/out_of_line_executor_test.cpp
new file mode 100644
index 00000000000..29fa1892720
--- /dev/null
+++ b/src/mongo/util/out_of_line_executor_test.cpp
@@ -0,0 +1,201 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/util/out_of_line_executor.h"
+
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/executor_test_util.h"
+
+namespace mongo {
+namespace {
+
+TEST(ExecutorTest, RejectingExecutor) {
+ // Verify that the executor rejects every time and keeps an accurate count.
+ const auto exec = RejectingExecutor::make();
+
+ static constexpr size_t kCount = 1000;
+ for (size_t i = 0; i < kCount; ++i) {
+ exec->schedule([&](Status error) {
+ ASSERT_NOT_OK(error);
+ ASSERT_EQ(exec->tasksRejected.load(), (i + 1));
+ });
+ }
+}
+
+TEST(ExecutorTest, InlineCountingExecutor) {
+ // Verify that the executor accepts every time and keeps an accurate count.
+ const auto execA = InlineCountingExecutor::make();
+ const auto execB = InlineCountingExecutor::make();
+
+ // Using prime numbers so there is no chance of multiple traps
+ static constexpr size_t kCountA = 1013;
+ static constexpr size_t kCountB = 1511;
+
+ // Schedule kCountA tasks one at a time.
+ for (size_t i = 0; i < kCountA; ++i) {
+ execA->schedule([&](Status status) {
+ ASSERT_OK(status);
+ ASSERT_EQ(execA->tasksRun.load(), (i + 1));
+ });
+ }
+
+ {
+ // Schedule kCountB tasks recursively.
+ size_t i = 0;
+ std::function<void(Status)> recurseExec;
+ bool inTask = false;
+
+ recurseExec = [&](Status status) {
+ ASSERT(!std::exchange(inTask, true));
+ ASSERT_OK(status);
+
+ auto tasksRun = execB->tasksRun.load();
+ ASSERT_EQ(tasksRun, ++i);
+ if (tasksRun < kCountB) {
+ execB->schedule(recurseExec);
+ }
+
+ ASSERT(std::exchange(inTask, false));
+ };
+
+ execB->schedule(recurseExec);
+ }
+
+ // Verify that running executors together didn't change the expected counts.
+ ASSERT_EQ(execA->tasksRun.load(), kCountA);
+ ASSERT_EQ(execB->tasksRun.load(), kCountB);
+}
+
+DEATH_TEST(ExecutorTest,
+ GuaranteedExecutor_MainInvalid_FallbackInvalid,
+ GuaranteedExecutor::kNoExecutorStr) {
+ // If no executor was provided, then we invariant.
+ const auto gwarExec = makeGuaranteedExecutor({});
+}
+
+DEATH_TEST(ExecutorTest,
+ GuaranteedExecutor_MainInvalid_FallbackRejects,
+ GuaranteedExecutor::kRejectedWorkStr) {
+ // If we have a fallback and it rejects work, then we invariant.
+ const auto gwarExec = makeGuaranteedExecutor({}, RejectingExecutor::make());
+ gwarExec->schedule([](Status) { FAIL("Nothing should run the actual callback"); });
+}
+
+TEST(ExecutorTest, GuaranteedExecutor_MainInvalid_FallbackAccepts) {
+ // If we only have a fallback, then everything runs on it.
+ const auto countExec = InlineCountingExecutor::make();
+ const auto gwarExec = makeGuaranteedExecutor({}, countExec);
+
+ static constexpr size_t kCount = 1000;
+ for (size_t i = 0; i < kCount; ++i) {
+ gwarExec->schedule([&](Status status) { ASSERT_OK(status); });
+ }
+
+ ASSERT_EQ(countExec->tasksRun.load(), kCount);
+}
+
+DEATH_TEST(ExecutorTest,
+ GuaranteedExecutor_MainRejects_FallbackInvalid,
+ GuaranteedExecutor::kRejectedWorkStr) {
+ // If we only have a main executor and it rejects work, then we invariant.
+ const auto gwarExec = makeGuaranteedExecutor(RejectingExecutor::make());
+ gwarExec->schedule([](Status) { FAIL("Nothing should run the actual callback"); });
+}
+
+DEATH_TEST(ExecutorTest,
+ GuaranteedExecutor_MainRejects_FallbackRejects,
+ GuaranteedExecutor::kRejectedWorkStr) {
+ // If we have a main and a fallback and both reject work, then we invariant.
+ const auto gwarExec =
+ makeGuaranteedExecutor(RejectingExecutor::make(), RejectingExecutor::make());
+ gwarExec->schedule([](Status) { FAIL("Nothing should run the actual callback"); });
+}
+
+TEST(ExecutorTest, GuaranteedExecutor_MainRejects_FallbackAccepts) {
+ // If the main rejects and the fallback accepts, then run on the fallback.
+ const auto rejectExec = RejectingExecutor::make();
+ const auto countExec = InlineCountingExecutor::make();
+ const auto gwarExec = makeGuaranteedExecutor(rejectExec, countExec);
+
+ static constexpr size_t kCount = 1000;
+ for (size_t i = 0; i < kCount; ++i) {
+ gwarExec->schedule([&](Status status) { ASSERT_OK(status); });
+ }
+ ASSERT_EQ(rejectExec->tasksRejected.load(), kCount);
+ ASSERT_EQ(countExec->tasksRun.load(), kCount);
+}
+
+TEST(ExecutorTest, GuaranteedExecutor_MainAccepts_FallbackInvalid) {
+ // If the main accepts and we don't have a fallback, then run on the main.
+ const auto countExec = InlineCountingExecutor::make();
+ const auto gwarExec = makeGuaranteedExecutor(countExec, {});
+
+ static constexpr size_t kCount = 1000;
+ for (size_t i = 0; i < kCount; ++i) {
+ gwarExec->schedule([&](Status status) { ASSERT_OK(status); });
+ }
+
+ ASSERT_EQ(countExec->tasksRun.load(), kCount);
+}
+
+TEST(ExecutorTest, GuaranteedExecutor_MainAccepts_FallbackRejects) {
+ // If the main accepts and the fallback would reject, then run on the main.
+ const auto countExec = InlineCountingExecutor::make();
+ const auto rejectExec = RejectingExecutor::make();
+ const auto gwarExec = makeGuaranteedExecutor(countExec, rejectExec);
+
+ static constexpr size_t kCount = 1000;
+ for (size_t i = 0; i < kCount; ++i) {
+ gwarExec->schedule([&](Status status) { ASSERT_OK(status); });
+ }
+
+ ASSERT_EQ(countExec->tasksRun.load(), kCount);
+ ASSERT_EQ(rejectExec->tasksRejected.load(), 0);
+}
+
+TEST(ExecutorTest, GuaranteedExecutor_MainAccepts_FallbackAccepts) {
+ // If both executor accepts, then run on the main.
+ const auto countExecA = InlineCountingExecutor::make();
+ const auto countExecB = InlineCountingExecutor::make();
+ const auto gwarExec = makeGuaranteedExecutor(countExecA, countExecB);
+
+ static constexpr size_t kCount = 1000;
+ for (size_t i = 0; i < kCount; ++i) {
+ gwarExec->schedule([&](Status status) { ASSERT_OK(status); });
+ }
+
+ ASSERT_EQ(countExecA->tasksRun.load(), kCount);
+ ASSERT_EQ(countExecB->tasksRun.load(), 0);
+}
+
+} // namespace
+} // namespace mongo