diff options
author | Mathias Stearn <mathias@10gen.com> | 2019-04-11 13:15:13 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-14 17:22:06 +0000 |
commit | daf92dae320330eb7ff74a07f3855f0aa6da2baf (patch) | |
tree | 86d3ce8504f3de86ac39d38f98d0e760cb3c4ae1 | |
parent | 3526e237f3ad7cd30f6f9edad4d57cd06ad2d22d (diff) | |
download | mongo-daf92dae320330eb7ff74a07f3855f0aa6da2baf.tar.gz |
SERVER-40590 delete KeyedExecutor
-rw-r--r-- | src/mongo/util/SConscript | 11 | ||||
-rw-r--r-- | src/mongo/util/keyed_executor.h | 244 | ||||
-rw-r--r-- | src/mongo/util/keyed_executor_test.cpp | 382 |
3 files changed, 0 insertions, 637 deletions
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index 2456eecf5da..b793462ca12 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -702,17 +702,6 @@ env.CppUnitTest( ) env.CppUnitTest( - target='keyed_executor_test', - source=[ - 'keyed_executor_test.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/util/concurrency/thread_pool', - ], -) - -env.CppUnitTest( target='strong_weak_finish_line_test', source=[ 'strong_weak_finish_line_test.cpp' diff --git a/src/mongo/util/keyed_executor.h b/src/mongo/util/keyed_executor.h deleted file mode 100644 index 093d3a30b7c..00000000000 --- a/src/mongo/util/keyed_executor.h +++ /dev/null @@ -1,244 +0,0 @@ - -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <deque> -#include <vector> - -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/unordered_map.h" -#include "mongo/util/concurrency/with_lock.h" -#include "mongo/util/future.h" -#include "mongo/util/out_of_line_executor.h" - -namespace mongo { - -/** - * This is a thread safe execution primitive for running jobs against an executor with mutual - * exclusion and queuing by key. - * - * Features: - * Keyed - Tasks are submitted under a key. The keys serve to prevent tasks for a given key from - * executing simultaneously. Tasks submitted under different keys may run concurrently. - * - * Queued - If a task is submitted for a key and another task is already running for that key, it - * is queued. I.e. tasks are run in FIFO order for a key. - * - * Thread Safe - This is a thread safe type. Any number of callers may invoke the public api - * methods simultaneously. - * - * Special Enhancements: - * onCurrentTasksDrained- Invoking this method for a key allows a caller to wait until all of the - * currently queued tasks for that key have completed. - * - * onAllCurrentTasksDrained- Invoking this method allows a caller to wait until all of the - * currently queued tasks for all key have completed. - * - * KeyedExecutorRetry - Throwing or returning KeyedExecutorRetry in a task will cause the task to - * be requeued immediately into the executor and retain its place in the - * queue. - * - * The template arguments to the type include the Key we wish to schedule under, and arguments that - * are passed through to stdx::unordered_map (I.e. Hash, KeyEqual, Allocator, etc). - * - * It is a programming error to destroy this type with tasks still in the queue. Clean shutdown can - * be effected by ceasing to queue new work, running tasks which can fail early and waiting on - * onAllCurrentTasksDrained. - */ -template <typename Key, typename... MapArgs> -class KeyedExecutor { - // We hold a deque per key. Each entry in the deque represents a task we'll eventually execute - // and a list of callers who need to be notified after it completes. - using Deque = std::deque<std::vector<SharedPromise<void>>>; - - using Map = stdx::unordered_map<Key, Deque, MapArgs...>; - -public: - explicit KeyedExecutor(OutOfLineExecutor* executor) : _executor(executor) {} - - KeyedExecutor(const KeyedExecutor&) = delete; - KeyedExecutor& operator=(const KeyedExecutor&) = delete; - - KeyedExecutor(KeyedExecutor&&) = delete; - KeyedExecutor& operator=(KeyedExecutor&&) = delete; - - ~KeyedExecutor() { - invariant(_map.empty()); - } - - /** - * Executes the callback on the associated executor. If another task is currently running for a - * given key, queues until that task is finished. - */ - template <typename Callback> - Future<FutureContinuationResult<Callback>> execute(const Key& key, Callback&& cb) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - typename Map::iterator iter; - bool wasInserted; - std::tie(iter, wasInserted) = _map.emplace( - std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple()); - - if (wasInserted) { - // If there wasn't a key, we're the first job, just run immediately - iter->second.emplace_back(); - - // Drop the lock before running execute to avoid deadlocks - lk.unlock(); - return _execute(iter, std::forward<Callback>(cb)); - } - - // If there's already a key, we queue up our execution behind it - auto future = - _onCleared(lk, iter->second).then([this, iter, cb] { return _execute(iter, cb); }); - - // Create a new set of promises for callers who rely on our readiness - iter->second.emplace_back(); - - return future; - } - - /** - * Returns a future which becomes ready when all queued tasks for a given key have completed. - * - * Note that this doesn't prevent other tasks from queueing and the readiness of this future - * says nothing about the execution of those tasks queued after this call. - */ - Future<void> onCurrentTasksDrained(const Key& key) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - auto iter = _map.find(key); - - if (iter == _map.end()) { - // If there wasn't a key, we're already cleared - return Future<void>::makeReady(); - } - - return _onCleared(lk, iter->second); - } - - /** - * Returns a future which becomes ready when all queued tasks for all keys have completed. - * - * Note that this doesn't prevent other tasks from queueing and the readiness of this future - * says nothing about the execution of those tasks queued after this call. - */ - Future<void> onAllCurrentTasksDrained() { - // This latch works around a current lack of whenAll. We have less need of a complicated - // type however (because our only failure mode is broken promise, a programming error here, - // and because we only need to handle void and can collapse). - struct Latch { - ~Latch() { - promise.emplaceValue(); - } - - explicit Latch(Promise<void> p) : promise(std::move(p)) {} - - Promise<void> promise; - }; - - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_map.empty()) { - // If there isn't any state, just return - return Future<void>::makeReady(); - } - - auto pf = makePromiseFuture<void>(); - // We rely on shard_ptr to handle the atomic refcounting before emplacing for us. - auto latch = std::make_shared<Latch>(std::move(pf.promise)); - auto future = std::move(pf.future); - - for (auto& pair : _map) { - _onCleared(lk, pair.second).getAsync([latch](const Status& status) mutable { - invariant(status); - latch.reset(); - }); - } - - return future; - } - -private: - /** - * executes and retries if the callback throws/returns KeyedExecutorRetry - */ - template <typename Callback> - Future<FutureContinuationResult<Callback>> _executeRetryErrors(Callback&& cb) { - return _executor->execute(std::forward<Callback>(cb)) - .onError([this, cb](const Status& status) { - if (status.code() == ErrorCodes::KeyedExecutorRetry) { - return _executeRetryErrors(cb); - } - - return Future<FutureContinuationResult<Callback>>(status); - }); - }; - - template <typename Callback> - Future<FutureContinuationResult<Callback>> _execute(typename Map::iterator iter, - Callback&& cb) { - // First we run until success, or non retry-able error - return _executeRetryErrors(std::forward<Callback>(cb)).tapAll([this, iter](const auto&) { - // Then handle clean up - auto promises = [&] { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - auto& deque = iter->second; - auto promises = std::move(deque.front()); - deque.pop_front(); - - if (deque.empty()) { - _map.erase(iter); - } - - return promises; - }(); - - // fulfill promises outside the lock - for (auto& promise : promises) { - promise.emplaceValue(); - } - }); - } - - Future<void> _onCleared(WithLock, Deque& deque) { - invariant(deque.size()); - auto pf = makePromiseFuture<void>(); - deque.back().push_back(pf.promise.share()); - return std::move(pf.future); - } - - stdx::mutex _mutex; - Map _map; - OutOfLineExecutor* _executor; -}; - -} // namespace mongo diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp deleted file mode 100644 index e44bf934d7f..00000000000 --- a/src/mongo/util/keyed_executor_test.cpp +++ /dev/null @@ -1,382 +0,0 @@ - -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/util/keyed_executor.h" - -#include <random> -#include <string> -#include <vector> - -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/thread.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/concurrency/thread_pool.h" - -namespace mongo { -namespace { - -class MockExecutor : public OutOfLineExecutor { -public: - void schedule(stdx::function<void()> func) override { - _deque.push_front(std::move(func)); - } - - size_t depth() const { - return _deque.size(); - } - - bool runOne() { - if (_deque.empty()) { - return false; - } - - auto x = std::move(_deque.back()); - _deque.pop_back(); - x(); - - return true; - } - - void runAll() { - while (runOne()) { - } - } - -private: - std::deque<stdx::function<void()>> _deque; -}; - -class ThreadPoolExecutor : public OutOfLineExecutor { -public: - ThreadPoolExecutor() : _threadPool(ThreadPool::Options{}) {} - - void start() { - _threadPool.startup(); - } - - void shutdown() { - _threadPool.shutdown(); - } - - void schedule(stdx::function<void()> func) override { - ASSERT_OK(_threadPool.schedule(std::move(func))); - } - -private: - ThreadPool _threadPool; -}; - -TEST(KeyedExecutor, basicExecute) { - MockExecutor me; - KeyedExecutor<std::string> ke(&me); - - auto run1 = ke.execute("foo", [] { return 1; }); - - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT_FALSE(run1.isReady()); - - auto run2 = ke.execute("foo", [] { return 2; }); - - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT_FALSE(run1.isReady()); - ASSERT_FALSE(run2.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT_EQUALS(run1.get(), 1); - ASSERT_FALSE(run2.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 0ul); - ASSERT_EQUALS(run2.get(), 2); -} - -TEST(KeyedExecutor, differentKeysDontConflict) { - MockExecutor me; - KeyedExecutor<std::string> ke(&me); - - auto foo = ke.execute("foo", [] { return true; }); - - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT_FALSE(foo.isReady()); - - auto bar = ke.execute("bar", [] { return true; }); - - ASSERT_EQUALS(me.depth(), 2ul); - ASSERT_FALSE(foo.isReady()); - ASSERT_FALSE(bar.isReady()); - - me.runAll(); - ASSERT_EQUALS(me.depth(), 0ul); - ASSERT(foo.get()); - ASSERT(bar.get()); -} - -TEST(KeyedExecutor, onCurrentTasksDrained) { - MockExecutor me; - KeyedExecutor<std::string> ke(&me); - - auto run1 = ke.execute("foo", [] { return true; }); - auto bar = ke.execute("bar", [] { return true; }); - auto onBarDone = ke.onCurrentTasksDrained("bar"); - auto onRun1Done = ke.onCurrentTasksDrained("foo"); - auto run2 = ke.execute("foo", [] { return true; }); - - ASSERT_EQUALS(me.depth(), 2ul); - ASSERT_FALSE(run1.isReady()); - ASSERT_FALSE(run2.isReady()); - ASSERT_FALSE(onRun1Done.isReady()); - ASSERT_FALSE(bar.isReady()); - ASSERT_FALSE(onBarDone.isReady()); - - auto onRun2Done = ke.onCurrentTasksDrained("foo"); - - ASSERT_EQUALS(me.depth(), 2ul); - ASSERT_FALSE(run1.isReady()); - ASSERT_FALSE(run2.isReady()); - ASSERT_FALSE(onRun1Done.isReady()); - ASSERT_FALSE(onRun2Done.isReady()); - ASSERT_FALSE(bar.isReady()); - ASSERT_FALSE(onBarDone.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 2ul); - ASSERT(run1.get()); - ASSERT_OK(onRun1Done.getNoThrow()); - ASSERT_FALSE(run2.isReady()); - ASSERT_FALSE(onRun2Done.isReady()); - ASSERT_FALSE(bar.isReady()); - ASSERT_FALSE(onBarDone.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT(bar.get()); - ASSERT_OK(onBarDone.getNoThrow()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 0ul); - ASSERT(run2.get()); - ASSERT_OK(onRun2Done.getNoThrow()); -} - -TEST(KeyedExecutor, onAllCurrentTasksDrained) { - MockExecutor me; - KeyedExecutor<std::string> ke(&me); - - auto run1 = ke.execute("foo", [] { return true; }); - - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT_FALSE(run1.isReady()); - - auto run2 = ke.execute("bar", [] { return true; }); - - ASSERT_EQUALS(me.depth(), 2ul); - ASSERT_FALSE(run1.isReady()); - ASSERT_FALSE(run2.isReady()); - - auto onAllDone = ke.onAllCurrentTasksDrained(); - - ASSERT_EQUALS(me.depth(), 2ul); - ASSERT_FALSE(run1.isReady()); - ASSERT_FALSE(run2.isReady()); - ASSERT_FALSE(onAllDone.isReady()); - - auto run3 = ke.execute("foo", [] { return true; }); - - ASSERT_EQUALS(me.depth(), 2ul); - ASSERT_FALSE(run1.isReady()); - ASSERT_FALSE(run2.isReady()); - ASSERT_FALSE(run3.isReady()); - ASSERT_FALSE(onAllDone.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 2ul); - ASSERT(run1.get()); - ASSERT_FALSE(run2.isReady()); - ASSERT_FALSE(onAllDone.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT(run2.get()); - ASSERT_OK(onAllDone.getNoThrow()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 0ul); - ASSERT(run3.get()); -} - -TEST(KeyedExecutor, onCurrentTasksDrainedEmpty) { - MockExecutor me; - KeyedExecutor<std::string> ke(&me); - - ASSERT_OK(ke.onCurrentTasksDrained("foo").getNoThrow()); -} - -TEST(KeyedExecutor, onAllCurrentTasksDrainedEmpty) { - MockExecutor me; - KeyedExecutor<std::string> ke(&me); - - ASSERT_OK(ke.onAllCurrentTasksDrained().getNoThrow()); -} - -TEST(KeyedExecutor, retriesFailureWithSpecialCode) { - MockExecutor me; - KeyedExecutor<std::string> ke(&me); - - int count = 2; - - auto run1 = ke.execute("foo", [&count] { - if (--count) { - uasserted(ErrorCodes::KeyedExecutorRetry, "force a retry"); - } - - return true; - }); - - auto run2 = ke.execute("foo", [] { return true; }); - - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT_FALSE(run1.isReady()); - ASSERT_FALSE(run2.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(count, 1); - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT_FALSE(run1.isReady()); - ASSERT_FALSE(run2.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(count, 0); - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT(run1.get()); - ASSERT_FALSE(run2.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 0ul); - ASSERT(run2.get()); -} - -TEST(KeyedExecutor, doesntRetryFailureWithoutSpecialCode) { - MockExecutor me; - KeyedExecutor<std::string> ke(&me); - - auto run1 = ke.execute("foo", [] { uasserted(ErrorCodes::BadValue, "some other code"); }); - - auto run2 = ke.execute("foo", [] { return true; }); - - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT_FALSE(run1.isReady()); - ASSERT_FALSE(run2.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 1ul); - ASSERT_THROWS_CODE(run1.get(), DBException, ErrorCodes::BadValue); - ASSERT_FALSE(run2.isReady()); - - ASSERT(me.runOne()); - ASSERT_EQUALS(me.depth(), 0ul); - ASSERT(run2.get()); -} - -TEST(KeyedExecutor, gracefulShutdown) { - MockExecutor me; - KeyedExecutor<std::string> ke(&me); - - Status status = Status::OK(); - - auto adaptForInShutdown = [&](auto&& cb) { - return [&] { - uassertStatusOK(status); - cb(); - }; - }; - - auto run = ke.execute("foo", adaptForInShutdown([] {})); - auto onRunDone = ke.onCurrentTasksDrained("foo"); - auto onAllRunDone = ke.onAllCurrentTasksDrained(); - - status = Status(ErrorCodes::InterruptedAtShutdown, "shutting down"); - me.runAll(); - - ASSERT_THROWS_CODE(run.get(), DBException, ErrorCodes::InterruptedAtShutdown); - onRunDone.get(); - onAllRunDone.get(); -} - -TEST(KeyedExecutor, withThreadsTest) { - ThreadPoolExecutor tpe; - KeyedExecutor<int> ke(&tpe); - tpe.start(); - - constexpr size_t n = (1 << 16); - - stdx::mutex mutex; - stdx::condition_variable condvar; - size_t counter = 0; - - auto incCounter = [&](auto&&) { - stdx::lock_guard<stdx::mutex> lk(mutex); - counter++; - - if (counter == n) { - condvar.notify_one(); - } - }; - - std::mt19937 gen(1); - std::uniform_int_distribution<> keyDistribution(1, 3); - std::uniform_int_distribution<> actionDistribution(1, 100); - - for (size_t i = 0; i < n; ++i) { - auto action = actionDistribution(gen); - - if (action <= 65) { - ke.execute(keyDistribution(gen), [] { - stdx::this_thread::yield(); - }).getAsync(incCounter); - } else if (action <= 90) { - ke.onCurrentTasksDrained(keyDistribution(gen)).getAsync(incCounter); - } else { - ke.onAllCurrentTasksDrained().getAsync(incCounter); - } - } - - stdx::unique_lock<stdx::mutex> lk(mutex); - condvar.wait(lk, [&] { return counter == n; }); - - tpe.shutdown(); - - ASSERT_EQUALS(counter, n); -} - -} // namespace -} // namespace mongo |