summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/util/SConscript11
-rw-r--r--src/mongo/util/keyed_executor.h243
-rw-r--r--src/mongo/util/keyed_executor_test.cpp380
-rw-r--r--src/mongo/util/out_of_line_executor.h77
5 files changed, 712 insertions, 0 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 522a7b02c58..5e07230086a 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -256,6 +256,7 @@ error_code("FreeMonHttpPermanentFailure", 255)
error_code("TransactionCommitted", 256)
error_code("TransactionTooLarge", 257)
error_code("UnknownFeatureCompatibilityVersion", 258);
+error_code("KeyedExecutorRetry", 259);
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index 10363c867d0..d83b7c86759 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -649,6 +649,17 @@ env.CppUnitTest(
],
)
+env.CppUnitTest(
+ target='keyed_executor_test',
+ source=[
+ 'keyed_executor_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ ],
+)
+
env.Benchmark(
target='future_bm',
source=[
diff --git a/src/mongo/util/keyed_executor.h b/src/mongo/util/keyed_executor.h
new file mode 100644
index 00000000000..8449a9a5cce
--- /dev/null
+++ b/src/mongo/util/keyed_executor.h
@@ -0,0 +1,243 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <deque>
+#include <vector>
+
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/future.h"
+#include "mongo/util/out_of_line_executor.h"
+
+namespace mongo {
+
+/**
+ * This is a thread safe execution primitive for running jobs against an executor with mutual
+ * exclusion and queuing by key.
+ *
+ * Features:
+ * Keyed - Tasks are submitted under a key. The keys serve to prevent tasks for a given key from
+ * executing simultaneously. Tasks submitted under different keys may run concurrently.
+ *
+ * Queued - If a task is submitted for a key and another task is already running for that key, it
+ * is queued. I.e. tasks are run in FIFO order for a key.
+ *
+ * Thread Safe - This is a thread safe type. Any number of callers may invoke the public api
+ * methods simultaneously.
+ *
+ * Special Enhancements:
+ * onCurrentTasksDrained- Invoking this method for a key allows a caller to wait until all of the
+ * currently queued tasks for that key have completed.
+ *
+ * onAllCurrentTasksDrained- Invoking this method allows a caller to wait until all of the
+ * currently queued tasks for all key have completed.
+ *
+ * KeyedExecutorRetry - Throwing or returning KeyedExecutorRetry in a task will cause the task to
+ * be requeued immediately into the executor and retain its place in the
+ * queue.
+ *
+ * The template arguments to the type include the Key we wish to schedule under, and arguments that
+ * are passed through to stdx::unordered_map (I.e. Hash, KeyEqual, Allocator, etc).
+ *
+ * It is a programming error to destroy this type with tasks still in the queue. Clean shutdown can
+ * be effected by ceasing to queue new work, running tasks which can fail early and waiting on
+ * onAllCurrentTasksDrained.
+ */
+template <typename Key, typename... MapArgs>
+class KeyedExecutor {
+ // We hold a deque per key. Each entry in the deque represents a task we'll eventually execute
+ // and a list of callers who need to be notified after it completes.
+ using Deque = std::deque<std::vector<SharedPromise<void>>>;
+
+ using Map = stdx::unordered_map<Key, Deque, MapArgs...>;
+
+public:
+ explicit KeyedExecutor(OutOfLineExecutor* executor) : _executor(executor) {}
+
+ KeyedExecutor(const KeyedExecutor&) = delete;
+ KeyedExecutor& operator=(const KeyedExecutor&) = delete;
+
+ KeyedExecutor(KeyedExecutor&&) = delete;
+ KeyedExecutor& operator=(KeyedExecutor&&) = delete;
+
+ ~KeyedExecutor() {
+ invariant(_map.empty());
+ }
+
+ /**
+ * Executes the callback on the associated executor. If another task is currently running for a
+ * given key, queues until that task is finished.
+ */
+ template <typename Callback>
+ Future<FutureContinuationResult<Callback>> execute(const Key& key, Callback&& cb) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ typename Map::iterator iter;
+ bool wasInserted;
+ std::tie(iter, wasInserted) = _map.emplace(
+ std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple());
+
+ if (wasInserted) {
+ // If there wasn't a key, we're the first job, just run immediately
+ iter->second.emplace_back();
+
+ // Drop the lock before running execute to avoid deadlocks
+ lk.unlock();
+ return _execute(iter, std::forward<Callback>(cb));
+ }
+
+ // If there's already a key, we queue up our execution behind it
+ auto future =
+ _onCleared(lk, iter->second).then([this, iter, cb] { return _execute(iter, cb); });
+
+ // Create a new set of promises for callers who rely on our readiness
+ iter->second.emplace_back();
+
+ return future;
+ }
+
+ /**
+ * Returns a future which becomes ready when all queued tasks for a given key have completed.
+ *
+ * Note that this doesn't prevent other tasks from queueing and the readiness of this future
+ * says nothing about the execution of those tasks queued after this call.
+ */
+ Future<void> onCurrentTasksDrained(const Key& key) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto iter = _map.find(key);
+
+ if (iter == _map.end()) {
+ // If there wasn't a key, we're already cleared
+ return Future<void>::makeReady();
+ }
+
+ return _onCleared(lk, iter->second);
+ }
+
+ /**
+ * Returns a future which becomes ready when all queued tasks for all keys have completed.
+ *
+ * Note that this doesn't prevent other tasks from queueing and the readiness of this future
+ * says nothing about the execution of those tasks queued after this call.
+ */
+ Future<void> onAllCurrentTasksDrained() {
+ // This latch works around a current lack of whenAll. We have less need of a complicated
+ // type however (because our only failure mode is broken promise, a programming error here,
+ // and because we only need to handle void and can collapse).
+ struct Latch {
+ ~Latch() {
+ promise.emplaceValue();
+ }
+
+ Promise<void> promise;
+ };
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (_map.empty()) {
+ // If there isn't any state, just return
+ return Future<void>::makeReady();
+ }
+
+ // We rely on shard_ptr to handle the atomic refcounting before emplacing for us.
+ auto latch = std::make_shared<Latch>();
+ auto future = latch->promise.getFuture();
+
+ for (auto& pair : _map) {
+ _onCleared(lk, pair.second).getAsync([latch](const Status& status) mutable {
+ invariant(status);
+ latch.reset();
+ });
+ }
+
+ return future;
+ }
+
+private:
+ /**
+ * executes and retries if the callback throws/returns KeyedExecutorRetry
+ */
+ template <typename Callback>
+ Future<FutureContinuationResult<Callback>> _executeRetryErrors(Callback&& cb) {
+ return _executor->execute(std::forward<Callback>(cb))
+ .onError([this, cb](const Status& status) {
+ if (status.code() == ErrorCodes::KeyedExecutorRetry) {
+ return _executeRetryErrors(cb);
+ }
+
+ return Future<FutureContinuationResult<Callback>>(status);
+ });
+ };
+
+ template <typename Callback>
+ Future<FutureContinuationResult<Callback>> _execute(typename Map::iterator iter,
+ Callback&& cb) {
+ // First we run until success, or non retry-able error
+ return _executeRetryErrors(std::forward<Callback>(cb)).tapAll([this, iter](const auto&) {
+ // Then handle clean up
+ auto promises = [&] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ auto& deque = iter->second;
+ auto promises = std::move(deque.front());
+ deque.pop_front();
+
+ if (deque.empty()) {
+ _map.erase(iter);
+ }
+
+ return promises;
+ }();
+
+ // fulfill promises outside the lock
+ for (auto& promise : promises) {
+ promise.emplaceValue();
+ }
+ });
+ }
+
+ Future<void> _onCleared(WithLock, Deque& deque) {
+ invariant(deque.size());
+
+ Promise<void> promise;
+ Future<void> future = promise.getFuture();
+
+ deque.back().push_back(promise.share());
+
+ return future;
+ }
+
+ stdx::mutex _mutex;
+ Map _map;
+ OutOfLineExecutor* _executor;
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp
new file mode 100644
index 00000000000..c050792c2e3
--- /dev/null
+++ b/src/mongo/util/keyed_executor_test.cpp
@@ -0,0 +1,380 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/util/keyed_executor.h"
+
+#include <random>
+#include <string>
+#include <vector>
+
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+namespace {
+
+class MockExecutor : public OutOfLineExecutor {
+public:
+ void schedule(stdx::function<void()> func) override {
+ _deque.push_front(std::move(func));
+ }
+
+ size_t depth() const {
+ return _deque.size();
+ }
+
+ bool runOne() {
+ if (_deque.empty()) {
+ return false;
+ }
+
+ auto x = std::move(_deque.back());
+ _deque.pop_back();
+ x();
+
+ return true;
+ }
+
+ void runAll() {
+ while (runOne()) {
+ }
+ }
+
+private:
+ std::deque<stdx::function<void()>> _deque;
+};
+
+class ThreadPoolExecutor : public OutOfLineExecutor {
+public:
+ ThreadPoolExecutor() : _threadPool(ThreadPool::Options{}) {}
+
+ void start() {
+ _threadPool.startup();
+ }
+
+ void shutdown() {
+ _threadPool.shutdown();
+ }
+
+ void schedule(stdx::function<void()> func) override {
+ ASSERT_OK(_threadPool.schedule(std::move(func)));
+ }
+
+private:
+ ThreadPool _threadPool;
+};
+
+TEST(KeyedExecutor, basicExecute) {
+ MockExecutor me;
+ KeyedExecutor<std::string> ke(&me);
+
+ auto run1 = ke.execute("foo", [] { return 1; });
+
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT_FALSE(run1.isReady());
+
+ auto run2 = ke.execute("foo", [] { return 2; });
+
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT_FALSE(run1.isReady());
+ ASSERT_FALSE(run2.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT_EQUALS(run1.get(), 1);
+ ASSERT_FALSE(run2.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 0ul);
+ ASSERT_EQUALS(run2.get(), 2);
+}
+
+TEST(KeyedExecutor, differentKeysDontConflict) {
+ MockExecutor me;
+ KeyedExecutor<std::string> ke(&me);
+
+ auto foo = ke.execute("foo", [] { return true; });
+
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT_FALSE(foo.isReady());
+
+ auto bar = ke.execute("bar", [] { return true; });
+
+ ASSERT_EQUALS(me.depth(), 2ul);
+ ASSERT_FALSE(foo.isReady());
+ ASSERT_FALSE(bar.isReady());
+
+ me.runAll();
+ ASSERT_EQUALS(me.depth(), 0ul);
+ ASSERT(foo.get());
+ ASSERT(bar.get());
+}
+
+TEST(KeyedExecutor, onCurrentTasksDrained) {
+ MockExecutor me;
+ KeyedExecutor<std::string> ke(&me);
+
+ auto run1 = ke.execute("foo", [] { return true; });
+ auto bar = ke.execute("bar", [] { return true; });
+ auto onBarDone = ke.onCurrentTasksDrained("bar");
+ auto onRun1Done = ke.onCurrentTasksDrained("foo");
+ auto run2 = ke.execute("foo", [] { return true; });
+
+ ASSERT_EQUALS(me.depth(), 2ul);
+ ASSERT_FALSE(run1.isReady());
+ ASSERT_FALSE(run2.isReady());
+ ASSERT_FALSE(onRun1Done.isReady());
+ ASSERT_FALSE(bar.isReady());
+ ASSERT_FALSE(onBarDone.isReady());
+
+ auto onRun2Done = ke.onCurrentTasksDrained("foo");
+
+ ASSERT_EQUALS(me.depth(), 2ul);
+ ASSERT_FALSE(run1.isReady());
+ ASSERT_FALSE(run2.isReady());
+ ASSERT_FALSE(onRun1Done.isReady());
+ ASSERT_FALSE(onRun2Done.isReady());
+ ASSERT_FALSE(bar.isReady());
+ ASSERT_FALSE(onBarDone.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 2ul);
+ ASSERT(run1.get());
+ ASSERT_OK(onRun1Done.getNoThrow());
+ ASSERT_FALSE(run2.isReady());
+ ASSERT_FALSE(onRun2Done.isReady());
+ ASSERT_FALSE(bar.isReady());
+ ASSERT_FALSE(onBarDone.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT(bar.get());
+ ASSERT_OK(onBarDone.getNoThrow());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 0ul);
+ ASSERT(run2.get());
+ ASSERT_OK(onRun2Done.getNoThrow());
+}
+
+TEST(KeyedExecutor, onAllCurrentTasksDrained) {
+ MockExecutor me;
+ KeyedExecutor<std::string> ke(&me);
+
+ auto run1 = ke.execute("foo", [] { return true; });
+
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT_FALSE(run1.isReady());
+
+ auto run2 = ke.execute("bar", [] { return true; });
+
+ ASSERT_EQUALS(me.depth(), 2ul);
+ ASSERT_FALSE(run1.isReady());
+ ASSERT_FALSE(run2.isReady());
+
+ auto onAllDone = ke.onAllCurrentTasksDrained();
+
+ ASSERT_EQUALS(me.depth(), 2ul);
+ ASSERT_FALSE(run1.isReady());
+ ASSERT_FALSE(run2.isReady());
+ ASSERT_FALSE(onAllDone.isReady());
+
+ auto run3 = ke.execute("foo", [] { return true; });
+
+ ASSERT_EQUALS(me.depth(), 2ul);
+ ASSERT_FALSE(run1.isReady());
+ ASSERT_FALSE(run2.isReady());
+ ASSERT_FALSE(run3.isReady());
+ ASSERT_FALSE(onAllDone.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 2ul);
+ ASSERT(run1.get());
+ ASSERT_FALSE(run2.isReady());
+ ASSERT_FALSE(onAllDone.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT(run2.get());
+ ASSERT_OK(onAllDone.getNoThrow());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 0ul);
+ ASSERT(run3.get());
+}
+
+TEST(KeyedExecutor, onCurrentTasksDrainedEmpty) {
+ MockExecutor me;
+ KeyedExecutor<std::string> ke(&me);
+
+ ASSERT_OK(ke.onCurrentTasksDrained("foo").getNoThrow());
+}
+
+TEST(KeyedExecutor, onAllCurrentTasksDrainedEmpty) {
+ MockExecutor me;
+ KeyedExecutor<std::string> ke(&me);
+
+ ASSERT_OK(ke.onAllCurrentTasksDrained().getNoThrow());
+}
+
+TEST(KeyedExecutor, retriesFailureWithSpecialCode) {
+ MockExecutor me;
+ KeyedExecutor<std::string> ke(&me);
+
+ int count = 2;
+
+ auto run1 = ke.execute("foo", [&count] {
+ if (--count) {
+ uasserted(ErrorCodes::KeyedExecutorRetry, "force a retry");
+ }
+
+ return true;
+ });
+
+ auto run2 = ke.execute("foo", [] { return true; });
+
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT_FALSE(run1.isReady());
+ ASSERT_FALSE(run2.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(count, 1);
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT_FALSE(run1.isReady());
+ ASSERT_FALSE(run2.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(count, 0);
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT(run1.get());
+ ASSERT_FALSE(run2.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 0ul);
+ ASSERT(run2.get());
+}
+
+TEST(KeyedExecutor, doesntRetryFailureWithoutSpecialCode) {
+ MockExecutor me;
+ KeyedExecutor<std::string> ke(&me);
+
+ auto run1 = ke.execute("foo", [] { uasserted(ErrorCodes::BadValue, "some other code"); });
+
+ auto run2 = ke.execute("foo", [] { return true; });
+
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT_FALSE(run1.isReady());
+ ASSERT_FALSE(run2.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 1ul);
+ ASSERT_THROWS_CODE(run1.get(), DBException, ErrorCodes::BadValue);
+ ASSERT_FALSE(run2.isReady());
+
+ ASSERT(me.runOne());
+ ASSERT_EQUALS(me.depth(), 0ul);
+ ASSERT(run2.get());
+}
+
+TEST(KeyedExecutor, gracefulShutdown) {
+ MockExecutor me;
+ KeyedExecutor<std::string> ke(&me);
+
+ Status status = Status::OK();
+
+ auto adaptForInShutdown = [&](auto&& cb) {
+ return [&] {
+ uassertStatusOK(status);
+ cb();
+ };
+ };
+
+ auto run = ke.execute("foo", adaptForInShutdown([] {}));
+ auto onRunDone = ke.onCurrentTasksDrained("foo");
+ auto onAllRunDone = ke.onAllCurrentTasksDrained();
+
+ status = Status(ErrorCodes::InterruptedAtShutdown, "shutting down");
+ me.runAll();
+
+ ASSERT_THROWS_CODE(run.get(), DBException, ErrorCodes::InterruptedAtShutdown);
+ onRunDone.get();
+ onAllRunDone.get();
+}
+
+TEST(KeyedExecutor, withThreadsTest) {
+ ThreadPoolExecutor tpe;
+ KeyedExecutor<int> ke(&tpe);
+ tpe.start();
+
+ constexpr size_t n = (1 << 16);
+
+ stdx::mutex mutex;
+ stdx::condition_variable condvar;
+ size_t counter = 0;
+
+ auto incCounter = [&](auto&&) {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ counter++;
+
+ if (counter == n) {
+ condvar.notify_one();
+ }
+ };
+
+ std::mt19937 gen(1);
+ std::uniform_int_distribution<> keyDistribution(1, 3);
+ std::uniform_int_distribution<> actionDistribution(1, 100);
+
+ for (size_t i = 0; i < n; ++i) {
+ auto action = actionDistribution(gen);
+
+ if (action <= 65) {
+ ke.execute(keyDistribution(gen), [] {
+ stdx::this_thread::yield();
+ }).getAsync(incCounter);
+ } else if (action <= 90) {
+ ke.onCurrentTasksDrained(keyDistribution(gen)).getAsync(incCounter);
+ } else {
+ ke.onAllCurrentTasksDrained().getAsync(incCounter);
+ }
+ }
+
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ condvar.wait(lk, [&] { return counter == n; });
+
+ tpe.shutdown();
+
+ ASSERT_EQUALS(counter, n);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h
new file mode 100644
index 00000000000..e162a4f8347
--- /dev/null
+++ b/src/mongo/util/out_of_line_executor.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/stdx/functional.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+
+/**
+ * Provides the minimal api for a simple out of line executor that can run non-cancellable
+ * callbacks.
+ *
+ * Adds in a minimal amount of support for futures.
+ *
+ * The contract for scheduling work on an executor is that it never blocks the caller. It doesn't
+ * necessarily need to offer forward progress guarantees, but actual calls to schedule() should not
+ * deadlock.
+ *
+ * As an explicit point of implementation: it will never invoke the passed callback from within the
+ * scheduling call.
+ */
+class OutOfLineExecutor {
+public:
+ /**
+ * Invokes the callback on the executor, as in schedule(), returning a future with its result.
+ * That future may be ready by the time the caller returns, which means that continuations
+ * chained on the returned future may be invoked on the caller of execute's stack.
+ */
+ template <typename Callback>
+ Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
+ Promise<FutureContinuationResult<Callback>> promise;
+ auto future = promise.getFuture();
+
+ schedule([ cb = std::forward<Callback>(cb), sp = promise.share() ]() mutable {
+ sp.setWith(std::move(cb));
+ });
+
+ return future;
+ }
+
+ /**
+ * Invokes the callback on the executor. This never happens immediately on the caller's stack.
+ */
+ virtual void schedule(stdx::function<void()> func) = 0;
+
+protected:
+ ~OutOfLineExecutor() noexcept {}
+};
+
+} // namespace mongo