diff options
author | Jason Carey <jcarey@argv.me> | 2018-05-17 17:01:29 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-05-25 11:16:55 -0400 |
commit | 333fdd10e3981ef3938dc4a6b5807f2f2da95110 (patch) | |
tree | cf55dc12b92ddd4a636d1bde69750aab8d377ec4 /src/mongo/util/keyed_executor_test.cpp | |
parent | 81f3e81d6b0f6ae23a422b47e9e58262de816cd7 (diff) | |
download | mongo-333fdd10e3981ef3938dc4a6b5807f2f2da95110.tar.gz |
SERVER-35055 keyed executor
This is a thread safe execution primitive for running jobs against an executor with mutual
exclusion and queuing by key.
Features:
Keyed - Tasks are submitted under a key. The keys serve to prevent tasks for a given key from
executing simultaneously. Tasks submitted under different keys may run concurrently.
Queued - If a task is submitted for a key and another task is already running for that key, it
is queued. I.e. tasks are run in FIFO order for a key.
Thread Safe - This is a thread safe type. Any number of callers may invoke the public api
methods simultaneously.
Special Enhancements:
onCurrentTasksDrained- Invoking this method for a key allows a caller to wait until all of the
currently queued tasks for that key have completed.
onAllCurrentTasksDrained- Invoking this method allows a caller to wait until all of the
currently queued tasks for all key have completed.
KeyedExecutorRetry - Throwing or returning KeyedExecutorRetry in a task will cause the task to
be requeued immediately into the executor and retain its place in the
queue.
Diffstat (limited to 'src/mongo/util/keyed_executor_test.cpp')
-rw-r--r-- | src/mongo/util/keyed_executor_test.cpp | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp new file mode 100644 index 00000000000..c050792c2e3 --- /dev/null +++ b/src/mongo/util/keyed_executor_test.cpp @@ -0,0 +1,380 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/util/keyed_executor.h" + +#include <random> +#include <string> +#include <vector> + +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { +namespace { + +class MockExecutor : public OutOfLineExecutor { +public: + void schedule(stdx::function<void()> func) override { + _deque.push_front(std::move(func)); + } + + size_t depth() const { + return _deque.size(); + } + + bool runOne() { + if (_deque.empty()) { + return false; + } + + auto x = std::move(_deque.back()); + _deque.pop_back(); + x(); + + return true; + } + + void runAll() { + while (runOne()) { + } + } + +private: + std::deque<stdx::function<void()>> _deque; +}; + +class ThreadPoolExecutor : public OutOfLineExecutor { +public: + ThreadPoolExecutor() : _threadPool(ThreadPool::Options{}) {} + + void start() { + _threadPool.startup(); + } + + void shutdown() { + _threadPool.shutdown(); + } + + void schedule(stdx::function<void()> func) override { + ASSERT_OK(_threadPool.schedule(std::move(func))); + } + +private: + ThreadPool _threadPool; +}; + +TEST(KeyedExecutor, basicExecute) { + MockExecutor me; + KeyedExecutor<std::string> ke(&me); + + auto run1 = ke.execute("foo", [] { return 1; }); + + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT_FALSE(run1.isReady()); + + auto run2 = ke.execute("foo", [] { return 2; }); + + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT_FALSE(run1.isReady()); + ASSERT_FALSE(run2.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT_EQUALS(run1.get(), 1); + ASSERT_FALSE(run2.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 0ul); + ASSERT_EQUALS(run2.get(), 2); +} + +TEST(KeyedExecutor, differentKeysDontConflict) { + MockExecutor me; + KeyedExecutor<std::string> ke(&me); + + auto foo = ke.execute("foo", [] { return true; }); + + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT_FALSE(foo.isReady()); + + auto bar = ke.execute("bar", [] { return true; }); + + ASSERT_EQUALS(me.depth(), 2ul); + ASSERT_FALSE(foo.isReady()); + ASSERT_FALSE(bar.isReady()); + + me.runAll(); + ASSERT_EQUALS(me.depth(), 0ul); + ASSERT(foo.get()); + ASSERT(bar.get()); +} + +TEST(KeyedExecutor, onCurrentTasksDrained) { + MockExecutor me; + KeyedExecutor<std::string> ke(&me); + + auto run1 = ke.execute("foo", [] { return true; }); + auto bar = ke.execute("bar", [] { return true; }); + auto onBarDone = ke.onCurrentTasksDrained("bar"); + auto onRun1Done = ke.onCurrentTasksDrained("foo"); + auto run2 = ke.execute("foo", [] { return true; }); + + ASSERT_EQUALS(me.depth(), 2ul); + ASSERT_FALSE(run1.isReady()); + ASSERT_FALSE(run2.isReady()); + ASSERT_FALSE(onRun1Done.isReady()); + ASSERT_FALSE(bar.isReady()); + ASSERT_FALSE(onBarDone.isReady()); + + auto onRun2Done = ke.onCurrentTasksDrained("foo"); + + ASSERT_EQUALS(me.depth(), 2ul); + ASSERT_FALSE(run1.isReady()); + ASSERT_FALSE(run2.isReady()); + ASSERT_FALSE(onRun1Done.isReady()); + ASSERT_FALSE(onRun2Done.isReady()); + ASSERT_FALSE(bar.isReady()); + ASSERT_FALSE(onBarDone.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 2ul); + ASSERT(run1.get()); + ASSERT_OK(onRun1Done.getNoThrow()); + ASSERT_FALSE(run2.isReady()); + ASSERT_FALSE(onRun2Done.isReady()); + ASSERT_FALSE(bar.isReady()); + ASSERT_FALSE(onBarDone.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT(bar.get()); + ASSERT_OK(onBarDone.getNoThrow()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 0ul); + ASSERT(run2.get()); + ASSERT_OK(onRun2Done.getNoThrow()); +} + +TEST(KeyedExecutor, onAllCurrentTasksDrained) { + MockExecutor me; + KeyedExecutor<std::string> ke(&me); + + auto run1 = ke.execute("foo", [] { return true; }); + + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT_FALSE(run1.isReady()); + + auto run2 = ke.execute("bar", [] { return true; }); + + ASSERT_EQUALS(me.depth(), 2ul); + ASSERT_FALSE(run1.isReady()); + ASSERT_FALSE(run2.isReady()); + + auto onAllDone = ke.onAllCurrentTasksDrained(); + + ASSERT_EQUALS(me.depth(), 2ul); + ASSERT_FALSE(run1.isReady()); + ASSERT_FALSE(run2.isReady()); + ASSERT_FALSE(onAllDone.isReady()); + + auto run3 = ke.execute("foo", [] { return true; }); + + ASSERT_EQUALS(me.depth(), 2ul); + ASSERT_FALSE(run1.isReady()); + ASSERT_FALSE(run2.isReady()); + ASSERT_FALSE(run3.isReady()); + ASSERT_FALSE(onAllDone.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 2ul); + ASSERT(run1.get()); + ASSERT_FALSE(run2.isReady()); + ASSERT_FALSE(onAllDone.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT(run2.get()); + ASSERT_OK(onAllDone.getNoThrow()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 0ul); + ASSERT(run3.get()); +} + +TEST(KeyedExecutor, onCurrentTasksDrainedEmpty) { + MockExecutor me; + KeyedExecutor<std::string> ke(&me); + + ASSERT_OK(ke.onCurrentTasksDrained("foo").getNoThrow()); +} + +TEST(KeyedExecutor, onAllCurrentTasksDrainedEmpty) { + MockExecutor me; + KeyedExecutor<std::string> ke(&me); + + ASSERT_OK(ke.onAllCurrentTasksDrained().getNoThrow()); +} + +TEST(KeyedExecutor, retriesFailureWithSpecialCode) { + MockExecutor me; + KeyedExecutor<std::string> ke(&me); + + int count = 2; + + auto run1 = ke.execute("foo", [&count] { + if (--count) { + uasserted(ErrorCodes::KeyedExecutorRetry, "force a retry"); + } + + return true; + }); + + auto run2 = ke.execute("foo", [] { return true; }); + + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT_FALSE(run1.isReady()); + ASSERT_FALSE(run2.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(count, 1); + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT_FALSE(run1.isReady()); + ASSERT_FALSE(run2.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(count, 0); + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT(run1.get()); + ASSERT_FALSE(run2.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 0ul); + ASSERT(run2.get()); +} + +TEST(KeyedExecutor, doesntRetryFailureWithoutSpecialCode) { + MockExecutor me; + KeyedExecutor<std::string> ke(&me); + + auto run1 = ke.execute("foo", [] { uasserted(ErrorCodes::BadValue, "some other code"); }); + + auto run2 = ke.execute("foo", [] { return true; }); + + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT_FALSE(run1.isReady()); + ASSERT_FALSE(run2.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 1ul); + ASSERT_THROWS_CODE(run1.get(), DBException, ErrorCodes::BadValue); + ASSERT_FALSE(run2.isReady()); + + ASSERT(me.runOne()); + ASSERT_EQUALS(me.depth(), 0ul); + ASSERT(run2.get()); +} + +TEST(KeyedExecutor, gracefulShutdown) { + MockExecutor me; + KeyedExecutor<std::string> ke(&me); + + Status status = Status::OK(); + + auto adaptForInShutdown = [&](auto&& cb) { + return [&] { + uassertStatusOK(status); + cb(); + }; + }; + + auto run = ke.execute("foo", adaptForInShutdown([] {})); + auto onRunDone = ke.onCurrentTasksDrained("foo"); + auto onAllRunDone = ke.onAllCurrentTasksDrained(); + + status = Status(ErrorCodes::InterruptedAtShutdown, "shutting down"); + me.runAll(); + + ASSERT_THROWS_CODE(run.get(), DBException, ErrorCodes::InterruptedAtShutdown); + onRunDone.get(); + onAllRunDone.get(); +} + +TEST(KeyedExecutor, withThreadsTest) { + ThreadPoolExecutor tpe; + KeyedExecutor<int> ke(&tpe); + tpe.start(); + + constexpr size_t n = (1 << 16); + + stdx::mutex mutex; + stdx::condition_variable condvar; + size_t counter = 0; + + auto incCounter = [&](auto&&) { + stdx::lock_guard<stdx::mutex> lk(mutex); + counter++; + + if (counter == n) { + condvar.notify_one(); + } + }; + + std::mt19937 gen(1); + std::uniform_int_distribution<> keyDistribution(1, 3); + std::uniform_int_distribution<> actionDistribution(1, 100); + + for (size_t i = 0; i < n; ++i) { + auto action = actionDistribution(gen); + + if (action <= 65) { + ke.execute(keyDistribution(gen), [] { + stdx::this_thread::yield(); + }).getAsync(incCounter); + } else if (action <= 90) { + ke.onCurrentTasksDrained(keyDistribution(gen)).getAsync(incCounter); + } else { + ke.onAllCurrentTasksDrained().getAsync(incCounter); + } + } + + stdx::unique_lock<stdx::mutex> lk(mutex); + condvar.wait(lk, [&] { return counter == n; }); + + tpe.shutdown(); + + ASSERT_EQUALS(counter, n); +} + +} // namespace +} // namespace mongo |