summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2019-04-11 13:15:13 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-14 17:22:06 +0000
commitdaf92dae320330eb7ff74a07f3855f0aa6da2baf (patch)
tree86d3ce8504f3de86ac39d38f98d0e760cb3c4ae1
parent3526e237f3ad7cd30f6f9edad4d57cd06ad2d22d (diff)
downloadmongo-daf92dae320330eb7ff74a07f3855f0aa6da2baf.tar.gz
SERVER-40590 delete KeyedExecutor
-rw-r--r--src/mongo/util/SConscript11
-rw-r--r--src/mongo/util/keyed_executor.h244
-rw-r--r--src/mongo/util/keyed_executor_test.cpp382
3 files changed, 0 insertions, 637 deletions
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index 2456eecf5da..b793462ca12 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -702,17 +702,6 @@ env.CppUnitTest(
)
env.CppUnitTest(
- target='keyed_executor_test',
- source=[
- 'keyed_executor_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/util/concurrency/thread_pool',
- ],
-)
-
-env.CppUnitTest(
target='strong_weak_finish_line_test',
source=[
'strong_weak_finish_line_test.cpp'
diff --git a/src/mongo/util/keyed_executor.h b/src/mongo/util/keyed_executor.h
deleted file mode 100644
index 093d3a30b7c..00000000000
--- a/src/mongo/util/keyed_executor.h
+++ /dev/null
@@ -1,244 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <deque>
-#include <vector>
-
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/unordered_map.h"
-#include "mongo/util/concurrency/with_lock.h"
-#include "mongo/util/future.h"
-#include "mongo/util/out_of_line_executor.h"
-
-namespace mongo {
-
-/**
- * This is a thread safe execution primitive for running jobs against an executor with mutual
- * exclusion and queuing by key.
- *
- * Features:
- * Keyed - Tasks are submitted under a key. The keys serve to prevent tasks for a given key from
- * executing simultaneously. Tasks submitted under different keys may run concurrently.
- *
- * Queued - If a task is submitted for a key and another task is already running for that key, it
- * is queued. I.e. tasks are run in FIFO order for a key.
- *
- * Thread Safe - This is a thread safe type. Any number of callers may invoke the public api
- * methods simultaneously.
- *
- * Special Enhancements:
- * onCurrentTasksDrained- Invoking this method for a key allows a caller to wait until all of the
- * currently queued tasks for that key have completed.
- *
- * onAllCurrentTasksDrained- Invoking this method allows a caller to wait until all of the
- * currently queued tasks for all key have completed.
- *
- * KeyedExecutorRetry - Throwing or returning KeyedExecutorRetry in a task will cause the task to
- * be requeued immediately into the executor and retain its place in the
- * queue.
- *
- * The template arguments to the type include the Key we wish to schedule under, and arguments that
- * are passed through to stdx::unordered_map (I.e. Hash, KeyEqual, Allocator, etc).
- *
- * It is a programming error to destroy this type with tasks still in the queue. Clean shutdown can
- * be effected by ceasing to queue new work, running tasks which can fail early and waiting on
- * onAllCurrentTasksDrained.
- */
-template <typename Key, typename... MapArgs>
-class KeyedExecutor {
- // We hold a deque per key. Each entry in the deque represents a task we'll eventually execute
- // and a list of callers who need to be notified after it completes.
- using Deque = std::deque<std::vector<SharedPromise<void>>>;
-
- using Map = stdx::unordered_map<Key, Deque, MapArgs...>;
-
-public:
- explicit KeyedExecutor(OutOfLineExecutor* executor) : _executor(executor) {}
-
- KeyedExecutor(const KeyedExecutor&) = delete;
- KeyedExecutor& operator=(const KeyedExecutor&) = delete;
-
- KeyedExecutor(KeyedExecutor&&) = delete;
- KeyedExecutor& operator=(KeyedExecutor&&) = delete;
-
- ~KeyedExecutor() {
- invariant(_map.empty());
- }
-
- /**
- * Executes the callback on the associated executor. If another task is currently running for a
- * given key, queues until that task is finished.
- */
- template <typename Callback>
- Future<FutureContinuationResult<Callback>> execute(const Key& key, Callback&& cb) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- typename Map::iterator iter;
- bool wasInserted;
- std::tie(iter, wasInserted) = _map.emplace(
- std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple());
-
- if (wasInserted) {
- // If there wasn't a key, we're the first job, just run immediately
- iter->second.emplace_back();
-
- // Drop the lock before running execute to avoid deadlocks
- lk.unlock();
- return _execute(iter, std::forward<Callback>(cb));
- }
-
- // If there's already a key, we queue up our execution behind it
- auto future =
- _onCleared(lk, iter->second).then([this, iter, cb] { return _execute(iter, cb); });
-
- // Create a new set of promises for callers who rely on our readiness
- iter->second.emplace_back();
-
- return future;
- }
-
- /**
- * Returns a future which becomes ready when all queued tasks for a given key have completed.
- *
- * Note that this doesn't prevent other tasks from queueing and the readiness of this future
- * says nothing about the execution of those tasks queued after this call.
- */
- Future<void> onCurrentTasksDrained(const Key& key) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- auto iter = _map.find(key);
-
- if (iter == _map.end()) {
- // If there wasn't a key, we're already cleared
- return Future<void>::makeReady();
- }
-
- return _onCleared(lk, iter->second);
- }
-
- /**
- * Returns a future which becomes ready when all queued tasks for all keys have completed.
- *
- * Note that this doesn't prevent other tasks from queueing and the readiness of this future
- * says nothing about the execution of those tasks queued after this call.
- */
- Future<void> onAllCurrentTasksDrained() {
- // This latch works around a current lack of whenAll. We have less need of a complicated
- // type however (because our only failure mode is broken promise, a programming error here,
- // and because we only need to handle void and can collapse).
- struct Latch {
- ~Latch() {
- promise.emplaceValue();
- }
-
- explicit Latch(Promise<void> p) : promise(std::move(p)) {}
-
- Promise<void> promise;
- };
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_map.empty()) {
- // If there isn't any state, just return
- return Future<void>::makeReady();
- }
-
- auto pf = makePromiseFuture<void>();
- // We rely on shard_ptr to handle the atomic refcounting before emplacing for us.
- auto latch = std::make_shared<Latch>(std::move(pf.promise));
- auto future = std::move(pf.future);
-
- for (auto& pair : _map) {
- _onCleared(lk, pair.second).getAsync([latch](const Status& status) mutable {
- invariant(status);
- latch.reset();
- });
- }
-
- return future;
- }
-
-private:
- /**
- * executes and retries if the callback throws/returns KeyedExecutorRetry
- */
- template <typename Callback>
- Future<FutureContinuationResult<Callback>> _executeRetryErrors(Callback&& cb) {
- return _executor->execute(std::forward<Callback>(cb))
- .onError([this, cb](const Status& status) {
- if (status.code() == ErrorCodes::KeyedExecutorRetry) {
- return _executeRetryErrors(cb);
- }
-
- return Future<FutureContinuationResult<Callback>>(status);
- });
- };
-
- template <typename Callback>
- Future<FutureContinuationResult<Callback>> _execute(typename Map::iterator iter,
- Callback&& cb) {
- // First we run until success, or non retry-able error
- return _executeRetryErrors(std::forward<Callback>(cb)).tapAll([this, iter](const auto&) {
- // Then handle clean up
- auto promises = [&] {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- auto& deque = iter->second;
- auto promises = std::move(deque.front());
- deque.pop_front();
-
- if (deque.empty()) {
- _map.erase(iter);
- }
-
- return promises;
- }();
-
- // fulfill promises outside the lock
- for (auto& promise : promises) {
- promise.emplaceValue();
- }
- });
- }
-
- Future<void> _onCleared(WithLock, Deque& deque) {
- invariant(deque.size());
- auto pf = makePromiseFuture<void>();
- deque.back().push_back(pf.promise.share());
- return std::move(pf.future);
- }
-
- stdx::mutex _mutex;
- Map _map;
- OutOfLineExecutor* _executor;
-};
-
-} // namespace mongo
diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp
deleted file mode 100644
index e44bf934d7f..00000000000
--- a/src/mongo/util/keyed_executor_test.cpp
+++ /dev/null
@@ -1,382 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/util/keyed_executor.h"
-
-#include <random>
-#include <string>
-#include <vector>
-
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/concurrency/thread_pool.h"
-
-namespace mongo {
-namespace {
-
-class MockExecutor : public OutOfLineExecutor {
-public:
- void schedule(stdx::function<void()> func) override {
- _deque.push_front(std::move(func));
- }
-
- size_t depth() const {
- return _deque.size();
- }
-
- bool runOne() {
- if (_deque.empty()) {
- return false;
- }
-
- auto x = std::move(_deque.back());
- _deque.pop_back();
- x();
-
- return true;
- }
-
- void runAll() {
- while (runOne()) {
- }
- }
-
-private:
- std::deque<stdx::function<void()>> _deque;
-};
-
-class ThreadPoolExecutor : public OutOfLineExecutor {
-public:
- ThreadPoolExecutor() : _threadPool(ThreadPool::Options{}) {}
-
- void start() {
- _threadPool.startup();
- }
-
- void shutdown() {
- _threadPool.shutdown();
- }
-
- void schedule(stdx::function<void()> func) override {
- ASSERT_OK(_threadPool.schedule(std::move(func)));
- }
-
-private:
- ThreadPool _threadPool;
-};
-
-TEST(KeyedExecutor, basicExecute) {
- MockExecutor me;
- KeyedExecutor<std::string> ke(&me);
-
- auto run1 = ke.execute("foo", [] { return 1; });
-
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT_FALSE(run1.isReady());
-
- auto run2 = ke.execute("foo", [] { return 2; });
-
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT_FALSE(run1.isReady());
- ASSERT_FALSE(run2.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT_EQUALS(run1.get(), 1);
- ASSERT_FALSE(run2.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 0ul);
- ASSERT_EQUALS(run2.get(), 2);
-}
-
-TEST(KeyedExecutor, differentKeysDontConflict) {
- MockExecutor me;
- KeyedExecutor<std::string> ke(&me);
-
- auto foo = ke.execute("foo", [] { return true; });
-
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT_FALSE(foo.isReady());
-
- auto bar = ke.execute("bar", [] { return true; });
-
- ASSERT_EQUALS(me.depth(), 2ul);
- ASSERT_FALSE(foo.isReady());
- ASSERT_FALSE(bar.isReady());
-
- me.runAll();
- ASSERT_EQUALS(me.depth(), 0ul);
- ASSERT(foo.get());
- ASSERT(bar.get());
-}
-
-TEST(KeyedExecutor, onCurrentTasksDrained) {
- MockExecutor me;
- KeyedExecutor<std::string> ke(&me);
-
- auto run1 = ke.execute("foo", [] { return true; });
- auto bar = ke.execute("bar", [] { return true; });
- auto onBarDone = ke.onCurrentTasksDrained("bar");
- auto onRun1Done = ke.onCurrentTasksDrained("foo");
- auto run2 = ke.execute("foo", [] { return true; });
-
- ASSERT_EQUALS(me.depth(), 2ul);
- ASSERT_FALSE(run1.isReady());
- ASSERT_FALSE(run2.isReady());
- ASSERT_FALSE(onRun1Done.isReady());
- ASSERT_FALSE(bar.isReady());
- ASSERT_FALSE(onBarDone.isReady());
-
- auto onRun2Done = ke.onCurrentTasksDrained("foo");
-
- ASSERT_EQUALS(me.depth(), 2ul);
- ASSERT_FALSE(run1.isReady());
- ASSERT_FALSE(run2.isReady());
- ASSERT_FALSE(onRun1Done.isReady());
- ASSERT_FALSE(onRun2Done.isReady());
- ASSERT_FALSE(bar.isReady());
- ASSERT_FALSE(onBarDone.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 2ul);
- ASSERT(run1.get());
- ASSERT_OK(onRun1Done.getNoThrow());
- ASSERT_FALSE(run2.isReady());
- ASSERT_FALSE(onRun2Done.isReady());
- ASSERT_FALSE(bar.isReady());
- ASSERT_FALSE(onBarDone.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT(bar.get());
- ASSERT_OK(onBarDone.getNoThrow());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 0ul);
- ASSERT(run2.get());
- ASSERT_OK(onRun2Done.getNoThrow());
-}
-
-TEST(KeyedExecutor, onAllCurrentTasksDrained) {
- MockExecutor me;
- KeyedExecutor<std::string> ke(&me);
-
- auto run1 = ke.execute("foo", [] { return true; });
-
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT_FALSE(run1.isReady());
-
- auto run2 = ke.execute("bar", [] { return true; });
-
- ASSERT_EQUALS(me.depth(), 2ul);
- ASSERT_FALSE(run1.isReady());
- ASSERT_FALSE(run2.isReady());
-
- auto onAllDone = ke.onAllCurrentTasksDrained();
-
- ASSERT_EQUALS(me.depth(), 2ul);
- ASSERT_FALSE(run1.isReady());
- ASSERT_FALSE(run2.isReady());
- ASSERT_FALSE(onAllDone.isReady());
-
- auto run3 = ke.execute("foo", [] { return true; });
-
- ASSERT_EQUALS(me.depth(), 2ul);
- ASSERT_FALSE(run1.isReady());
- ASSERT_FALSE(run2.isReady());
- ASSERT_FALSE(run3.isReady());
- ASSERT_FALSE(onAllDone.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 2ul);
- ASSERT(run1.get());
- ASSERT_FALSE(run2.isReady());
- ASSERT_FALSE(onAllDone.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT(run2.get());
- ASSERT_OK(onAllDone.getNoThrow());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 0ul);
- ASSERT(run3.get());
-}
-
-TEST(KeyedExecutor, onCurrentTasksDrainedEmpty) {
- MockExecutor me;
- KeyedExecutor<std::string> ke(&me);
-
- ASSERT_OK(ke.onCurrentTasksDrained("foo").getNoThrow());
-}
-
-TEST(KeyedExecutor, onAllCurrentTasksDrainedEmpty) {
- MockExecutor me;
- KeyedExecutor<std::string> ke(&me);
-
- ASSERT_OK(ke.onAllCurrentTasksDrained().getNoThrow());
-}
-
-TEST(KeyedExecutor, retriesFailureWithSpecialCode) {
- MockExecutor me;
- KeyedExecutor<std::string> ke(&me);
-
- int count = 2;
-
- auto run1 = ke.execute("foo", [&count] {
- if (--count) {
- uasserted(ErrorCodes::KeyedExecutorRetry, "force a retry");
- }
-
- return true;
- });
-
- auto run2 = ke.execute("foo", [] { return true; });
-
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT_FALSE(run1.isReady());
- ASSERT_FALSE(run2.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(count, 1);
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT_FALSE(run1.isReady());
- ASSERT_FALSE(run2.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(count, 0);
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT(run1.get());
- ASSERT_FALSE(run2.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 0ul);
- ASSERT(run2.get());
-}
-
-TEST(KeyedExecutor, doesntRetryFailureWithoutSpecialCode) {
- MockExecutor me;
- KeyedExecutor<std::string> ke(&me);
-
- auto run1 = ke.execute("foo", [] { uasserted(ErrorCodes::BadValue, "some other code"); });
-
- auto run2 = ke.execute("foo", [] { return true; });
-
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT_FALSE(run1.isReady());
- ASSERT_FALSE(run2.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 1ul);
- ASSERT_THROWS_CODE(run1.get(), DBException, ErrorCodes::BadValue);
- ASSERT_FALSE(run2.isReady());
-
- ASSERT(me.runOne());
- ASSERT_EQUALS(me.depth(), 0ul);
- ASSERT(run2.get());
-}
-
-TEST(KeyedExecutor, gracefulShutdown) {
- MockExecutor me;
- KeyedExecutor<std::string> ke(&me);
-
- Status status = Status::OK();
-
- auto adaptForInShutdown = [&](auto&& cb) {
- return [&] {
- uassertStatusOK(status);
- cb();
- };
- };
-
- auto run = ke.execute("foo", adaptForInShutdown([] {}));
- auto onRunDone = ke.onCurrentTasksDrained("foo");
- auto onAllRunDone = ke.onAllCurrentTasksDrained();
-
- status = Status(ErrorCodes::InterruptedAtShutdown, "shutting down");
- me.runAll();
-
- ASSERT_THROWS_CODE(run.get(), DBException, ErrorCodes::InterruptedAtShutdown);
- onRunDone.get();
- onAllRunDone.get();
-}
-
-TEST(KeyedExecutor, withThreadsTest) {
- ThreadPoolExecutor tpe;
- KeyedExecutor<int> ke(&tpe);
- tpe.start();
-
- constexpr size_t n = (1 << 16);
-
- stdx::mutex mutex;
- stdx::condition_variable condvar;
- size_t counter = 0;
-
- auto incCounter = [&](auto&&) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- counter++;
-
- if (counter == n) {
- condvar.notify_one();
- }
- };
-
- std::mt19937 gen(1);
- std::uniform_int_distribution<> keyDistribution(1, 3);
- std::uniform_int_distribution<> actionDistribution(1, 100);
-
- for (size_t i = 0; i < n; ++i) {
- auto action = actionDistribution(gen);
-
- if (action <= 65) {
- ke.execute(keyDistribution(gen), [] {
- stdx::this_thread::yield();
- }).getAsync(incCounter);
- } else if (action <= 90) {
- ke.onCurrentTasksDrained(keyDistribution(gen)).getAsync(incCounter);
- } else {
- ke.onAllCurrentTasksDrained().getAsync(incCounter);
- }
- }
-
- stdx::unique_lock<stdx::mutex> lk(mutex);
- condvar.wait(lk, [&] { return counter == n; });
-
- tpe.shutdown();
-
- ASSERT_EQUALS(counter, n);
-}
-
-} // namespace
-} // namespace mongo