diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2015-07-16 09:08:43 -0500 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2015-07-16 10:08:43 -0400 |
commit | 48f79b30ff75c6310869ea8d0e34925d5f63252f (patch) | |
tree | cf429b96eb779439d198a9ffe6c119e469aa97e6 /src | |
parent | fc7860846700e328c95263132bde4b603dd635c1 (diff) | |
download | mongo-48f79b30ff75c6310869ea8d0e34925d5f63252f.tar.gz |
SERVER-19001 Implementation of ThreadPoolTaskExecutor and basic tests.
In order to support deterministic unit testing, it was also necessary to
introduce a mock implementation of ThreadPool, executor::ThreadPoolMock,
that is tightly integrated with NetworkInterfaceMock to allow for
deterministic unit testing of things that use TaskExecutors.
To keep the ThreadPoolTaskExecutor from having to keep a dedicated
thread for handling scheduleAt(Date_t, ...) processing, a new method,
setAlarm, is introduced to NetworkInterface. setAlarm offers an
extremely relaxed contract, to maximize the number of legal implementations.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/executor/SConscript | 46 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 8 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio.h | 1 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_impl.cpp | 32 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_impl.h | 57 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 20 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 23 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_mock.cpp | 134 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_mock.h | 78 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 420 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 158 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_test.cpp | 96 |
13 files changed, 1051 insertions, 26 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index a5d409c6eac..6083b988bf5 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -25,7 +25,10 @@ env.Library(target='network_interface_impl', # TODO: rename to thread_pool_netwo ]) env.Library('network_interface_mock', - 'network_interface_mock.cpp', + [ + 'network_interface_mock.cpp', + 'thread_pool_mock.cpp', + ], LIBDEPS=[ 'network_interface', ]) @@ -66,11 +69,36 @@ env.Library( 'network_interface_impl', ]) -env.Library('task_executor_test_fixture', - [ - 'task_executor_test_common.cpp', - 'task_executor_test_fixture.cpp'], - LIBDEPS=[ - 'network_interface_mock', - 'task_executor_interface', - ]) +env.Library( + target='task_executor_test_fixture', + source=[ + 'task_executor_test_common.cpp', + 'task_executor_test_fixture.cpp' + ], + LIBDEPS=[ + 'network_interface_mock', + 'task_executor_interface', + ] +) + +env.Library( + target='thread_pool_task_executor', + source=[ + 'thread_pool_task_executor.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/util/concurrency/thread_pool' + ] +) + +env.CppUnitTest( + target='thread_pool_task_executor_test', + source=[ + 'thread_pool_task_executor_test.cpp', + ], + LIBDEPS=[ + 'thread_pool_task_executor', + 'task_executor_test_fixture', + '$BUILD_DIR/mongo/client/remote_command_runner', + ] +) diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 2d4f8bdfcb3..658b0f23d14 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -116,6 +116,14 @@ public: */ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0; + /** + * Sets an alarm, which schedules "action" to run no sooner than "when". + * + * "action" should not do anything that requires a lot of computation, or that might block for a + * long time, as it may execute in a network thread. + */ + virtual void setAlarm(Date_t when, const stdx::function<void()>& action) = 0; + protected: NetworkInterface(); }; diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index b9d7ab72a53..9163a68dd74 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -141,6 +141,10 @@ void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbH } } +void NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& action) { + MONGO_UNREACHABLE; +}; + bool NetworkInterfaceASIO::inShutdown() const { return (_state.load() == State::kShutdown); } diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 28955bda89e..eb18aecf38d 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -68,6 +68,7 @@ public: const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; + void setAlarm(Date_t when, const stdx::function<void()>& action) override; bool inShutdown() const; diff --git a/src/mongo/executor/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp index 7fd876d80b6..c37a371ffe5 100644 --- a/src/mongo/executor/network_interface_impl.cpp +++ b/src/mongo/executor/network_interface_impl.cpp @@ -47,8 +47,8 @@ namespace executor { namespace { -const size_t kMinThreads = 1; -const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room. +const size_t kMinThreads = 2; +const size_t kMaxThreads = 52; // Set to 1 + max repl set size, for heartbeat + wiggle room. const Seconds kMaxIdleThreadAge(30); ThreadPool::Options makeOptions() { @@ -84,6 +84,7 @@ void NetworkInterfaceImpl::startup() { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(!_inShutdown); _pool.startup(); + fassert(27824, _pool.schedule([this]() { _processAlarms(); })); } void NetworkInterfaceImpl::shutdown() { @@ -91,6 +92,7 @@ void NetworkInterfaceImpl::shutdown() { stdx::unique_lock<stdx::mutex> lk(_mutex); _inShutdown = true; _hasPending.notify_all(); + _newAlarmReady.notify_all(); _pool.shutdown(); lk.unlock(); _commandRunner.shutdown(); @@ -190,5 +192,31 @@ std::string NetworkInterfaceImpl::getHostName() { return getHostNameCached(); } +void NetworkInterfaceImpl::setAlarm(Date_t when, const stdx::function<void()>& action) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + const bool notify = _alarms.empty() || _alarms.top().when > when; + _alarms.emplace(when, action); + if (notify) { + _newAlarmReady.notify_all(); + } +} + +void NetworkInterfaceImpl::_processAlarms() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_inShutdown) { + if (_alarms.empty()) { + _newAlarmReady.wait(lk); + } else if (now() < _alarms.top().when) { + _newAlarmReady.wait_until(lk, _alarms.top().when.toSystemTimePoint()); + } else { + auto action = _alarms.top().action; + _alarms.pop(); + lk.unlock(); + action(); + lk.lock(); + } + } +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_impl.h b/src/mongo/executor/network_interface_impl.h index 6180e6526b2..cf9af82cca8 100644 --- a/src/mongo/executor/network_interface_impl.h +++ b/src/mongo/executor/network_interface_impl.h @@ -29,6 +29,8 @@ #pragma once +#include <queue> +#include <utility> #include <vector> #include "mongo/client/remote_command_runner_impl.h" @@ -67,25 +69,41 @@ namespace executor { * after they have been connected for a certain maximum period. * TODO(spencer): Rename this to ThreadPoolNetworkInterface */ -class NetworkInterfaceImpl : public NetworkInterface { +class NetworkInterfaceImpl final : public NetworkInterface { public: NetworkInterfaceImpl(); - virtual ~NetworkInterfaceImpl(); - virtual std::string getDiagnosticString(); - virtual void startup(); - virtual void shutdown(); - virtual void waitForWork(); - virtual void waitForWorkUntil(Date_t when); - virtual void signalWorkAvailable(); - virtual Date_t now(); - virtual std::string getHostName(); - virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish); - virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); + ~NetworkInterfaceImpl(); + std::string getDiagnosticString() override; + void startup() override; + void shutdown() override; + void waitForWork() override; + void waitForWorkUntil(Date_t when) override; + void signalWorkAvailable() override; + Date_t now() override; + std::string getHostName() override; + void startCommand(const TaskExecutor::CallbackHandle& cbHandle, + const RemoteCommandRequest& request, + const RemoteCommandCompletionFn& onFinish) override; + void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; + void setAlarm(Date_t when, const stdx::function<void()>& action) override; private: /** + * Information describing a scheduled alarm. + */ + struct AlarmInfo { + using AlarmAction = stdx::function<void()>; + AlarmInfo(Date_t inWhen, AlarmAction inAction) + : when(inWhen), action(std::move(inAction)) {} + bool operator>(const AlarmInfo& rhs) const { + return when > rhs.when; + } + + Date_t when; + AlarmAction action; + }; + + /** * Information describing an in-flight command. */ struct CommandData { @@ -101,6 +119,11 @@ private: void _runOneCommand(); /** + * Worker function that processes alarms set via setAlarm. + */ + void _processAlarms(); + + /** * Notifies the network threads that there is work available. */ void _signalWorkAvailable_inlock(); @@ -133,6 +156,12 @@ private: // Number of active network requests size_t _numActiveNetworkRequests = 0; + + // Condition variable to signal in order to wake up the alarm processing thread. + stdx::condition_variable _newAlarmReady; + + // Heap of alarms, with the next alarm always on top. + std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms; }; } // namespace executor diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 1553fc4287a..d2f60814016 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -114,6 +114,16 @@ void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbH // No not-in-progress network command matched cbHandle. Oh, well. } +void NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (when <= _now_inlock()) { + lk.unlock(); + action(); + return; + } + _alarms.emplace(when, action); +} + void NetworkInterfaceMock::startup() { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(!_hasStarted); @@ -242,6 +252,9 @@ void NetworkInterfaceMock::runUntil(Date_t until) { break; } Date_t newNow = _executorNextWakeupDate; + if (!_alarms.empty() && _alarms.top().when < newNow) { + newNow = _alarms.top().when; + } if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) { newNow = _scheduled.front().getResponseDate(); } @@ -286,6 +299,13 @@ void NetworkInterfaceMock::signalWorkAvailable() { } void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) { + while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) { + auto fn = _alarms.top().action; + _alarms.pop(); + lk->unlock(); + fn(); + lk->lock(); + } while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) { invariant(_currentlyRunning == kNetworkThread); NetworkOperation op = _scheduled.front(); diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 00d8487cf8c..92cad82c978 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -28,7 +28,9 @@ #pragma once -#include <map> +#include <queue> +#include <utility> +#include <vector> #include "mongo/executor/network_interface.h" #include "mongo/stdx/condition_variable.h" @@ -87,6 +89,7 @@ public: const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish); virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); + virtual void setAlarm(Date_t when, const stdx::function<void()>& action); //////////////////////////////////////////////////////////////////////////////// @@ -165,6 +168,21 @@ public: private: /** + * Information describing a scheduled alarm. + */ + struct AlarmInfo { + using AlarmAction = stdx::function<void()>; + AlarmInfo(Date_t inWhen, AlarmAction inAction) + : when(inWhen), action(std::move(inAction)) {} + bool operator>(const AlarmInfo& rhs) const { + return when > rhs.when; + } + + Date_t when; + AlarmAction action; + }; + + /** * Type used to identify which thread (network mock or executor) is currently executing. * * Values are used in a bitmask, as well. @@ -251,6 +269,9 @@ private: // List of network operations that will not be responded to until shutdown() is called. NetworkOperationList _blackHoled; // (M) + + // Heap of alarms, with the next alarm always on top. + std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms; // (M) }; /** diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp new file mode 100644 index 00000000000..ab7c9ddfda4 --- /dev/null +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -0,0 +1,134 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kExecutor + +#include "mongo/platform/basic.h" + +#include "mongo/executor/thread_pool_mock.h" + +#include "mongo/executor/network_interface_mock.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace executor { + +ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed) + : _prng(prngSeed), _net(net) {} + +ThreadPoolMock::~ThreadPoolMock() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _inShutdown = true; + _net->signalWorkAvailable(); + if (_started) { + if (_worker.joinable()) { + lk.unlock(); + _worker.join(); + lk.lock(); + } + } else { + consumeTasks(&lk); + } + invariant(_tasks.empty()); +} + +void ThreadPoolMock::startup() { + log() << "Starting pool"; + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_started); + invariant(!_worker.joinable()); + _started = true; + _worker = stdx::thread([this] { + stdx::unique_lock<stdx::mutex> lk(_mutex); + consumeTasks(&lk); + }); +} + +void ThreadPoolMock::shutdown() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _inShutdown = true; + _net->signalWorkAvailable(); +} + +void ThreadPoolMock::join() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _joining = true; + if (_started) { + stdx::thread toJoin = std::move(_worker); + _net->signalWorkAvailable(); + lk.unlock(); + toJoin.join(); + lk.lock(); + invariant(_tasks.empty()); + } else { + consumeTasks(&lk); + invariant(_tasks.empty()); + } +} + +Status ThreadPoolMock::schedule(Task task) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_inShutdown) { + return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + } + _tasks.emplace_back(std::move(task)); + return Status::OK(); +} + +void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) { + using std::swap; + log() << "Starting to consume tasks"; + while (!(_inShutdown && _tasks.empty())) { + if (_tasks.empty()) { + lk->unlock(); + _net->waitForWork(); + lk->lock(); + continue; + } + auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); + if (next + 1 != _tasks.size()) { + swap(_tasks[next], _tasks.back()); + } + Task fn = std::move(_tasks.back()); + _tasks.pop_back(); + lk->unlock(); + fn(); + lk->lock(); + } + log() << "Done consuming tasks"; + invariant(_tasks.empty()); + while (!_joining) { + lk->unlock(); + _net->waitForWork(); + lk->lock(); + } + log() << "Ready to join"; +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h new file mode 100644 index 00000000000..55e2607e53b --- /dev/null +++ b/src/mongo/executor/thread_pool_mock.h @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <cstdint> +#include <vector> + +#include "mongo/platform/random.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/concurrency/thread_pool_interface.h" + +namespace mongo { +namespace executor { + +class NetworkInterfaceMock; + +/** + * Implementation of a thread pool that is tightly integrated with NetworkInterfaceMock to allow for + * deterministic unit testing of ThreadPoolTaskExecutor. This pool has a single thread, which only + * executes jobs when the NetworkInterfaceMock allows it to (i.e., not when the NetworkInterfaceMock + * is in the "enterNetwork" mode. + */ +class ThreadPoolMock final : public ThreadPoolInterface { +public: + /** + * Create an instance that interlocks with "net". "prngSeed" seeds the pseudorandom number + * generator that is used to determine which schedulable task runs next. + */ + ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed); + ~ThreadPoolMock(); + + void startup() override; + void shutdown() override; + void join() override; + Status schedule(Task task) override; + +private: + void consumeTasks(stdx::unique_lock<stdx::mutex>* lk); + + stdx::mutex _mutex; + stdx::thread _worker; + std::vector<Task> _tasks; + PseudoRandom _prng; + NetworkInterfaceMock* const _net; + bool _started = false; + bool _inShutdown = false; + bool _joining = false; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp new file mode 100644 index 00000000000..86bce4061e7 --- /dev/null +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -0,0 +1,420 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kExecutor + +#include "mongo/platform/basic.h" + +#include "mongo/executor/thread_pool_task_executor.h" + +#include <iterator> + +#include "mongo/base/checked_cast.h" +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/executor/network_interface.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/util/concurrency/thread_pool_interface.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace executor { + +class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState { + MONGO_DISALLOW_COPYING(CallbackState); + +public: + static std::shared_ptr<CallbackState> make(CallbackFn cb, + EventHandle finishedEvent, + Date_t readyDate) { + return std::make_shared<CallbackState>(std::move(cb), std::move(finishedEvent), readyDate); + } + + /** + * Do not call directly. Use make. + */ + CallbackState(CallbackFn cb, EventHandle theFinishedEvent, Date_t theReadyDate) + : callback(std::move(cb)), + finishedEvent(std::move(theFinishedEvent)), + readyDate(theReadyDate) {} + + virtual ~CallbackState() = default; + + void cancel() override { + MONGO_UNREACHABLE; + } + + void waitForCompletion() override { + MONGO_UNREACHABLE; + } + + // All fields except for "canceled" are guarded by the owning task executor's _mutex. The + // "canceled" field may be observed without holding _mutex, but may only be set while holding + // _mutex. + + CallbackFn callback; + EventHandle finishedEvent; + AtomicUInt32 canceled{0U}; + WorkQueue::iterator iter; + Date_t readyDate; + bool isNetworkOperation = false; +}; + +class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState { + MONGO_DISALLOW_COPYING(EventState); + +public: + static std::shared_ptr<EventState> make() { + return std::make_shared<EventState>(); + } + + EventState() = default; + + void signal() override { + MONGO_UNREACHABLE; + } + void waitUntilSignaled() override { + MONGO_UNREACHABLE; + } + bool isSignaled() override { + MONGO_UNREACHABLE; + } + + // All fields guarded by the owning task executor's _mutex. + + bool isSignaledFlag = false; + stdx::condition_variable isSignaledCondition; + EventList::iterator iter; + WorkQueue waiters; +}; + +ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool, + std::unique_ptr<NetworkInterface> net) + : _net(std::move(net)), _pool(std::move(pool)) {} + +ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {} + +void ThreadPoolTaskExecutor::startup() { + _net->startup(); + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_inShutdown) { + return; + } + _pool->startup(); +} + +void ThreadPoolTaskExecutor::shutdown() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _inShutdown = true; + WorkQueue pending; + pending.splice(pending.end(), _networkInProgressQueue); + pending.splice(pending.end(), _sleepersQueue); + for (auto&& eventState : _unsignaledEvents) { + pending.splice(pending.end(), eventState->waiters); + } + for (auto&& cbState : pending) { + cbState->canceled.store(1); + } + for (auto&& cbState : _poolInProgressQueue) { + cbState->canceled.store(1); + } + scheduleIntoPool_inlock(&pending); + _net->signalWorkAvailable(); + _pool->shutdown(); +} + +void ThreadPoolTaskExecutor::join() { + _pool->join(); + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_unsignaledEvents.empty()) { + auto eventState = _unsignaledEvents.front(); + invariant(eventState->waiters.empty()); + EventHandle event; + setEventForHandle(&event, std::move(eventState)); + signalEvent_inlock(event); + } + lk.unlock(); + _net->shutdown(); + lk.lock(); + invariant(_poolInProgressQueue.empty()); + invariant(_networkInProgressQueue.empty()); + invariant(_sleepersQueue.empty()); + invariant(_unsignaledEvents.empty()); +} + +std::string ThreadPoolTaskExecutor::getDiagnosticString() { + return {}; +} + +Date_t ThreadPoolTaskExecutor::now() { + return _net->now(); +} + +StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return makeEvent_inlock(); +} + +void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + signalEvent_inlock(event); +} + +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const EventHandle& event, + const CallbackFn& work) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!event.isValid()) { + return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"}; + } + auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); + auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, work); + if (!cbHandle.isOK()) { + return cbHandle; + } + if (eventState->isSignaledFlag) { + scheduleIntoPool_inlock(&eventState->waiters); + } + return cbHandle; +} + +void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { + invariant(event.isValid()); + auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!eventState->isSignaledFlag) { + eventState->isSignaledCondition.wait(lk); + } +} + +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork( + const CallbackFn& work) { + WorkQueue temp; + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto cbHandle = enqueueCallbackState_inlock(&temp, work); + if (!cbHandle.isOK()) { + return cbHandle; + } + scheduleIntoPool_inlock(&temp); + return cbHandle; +} + +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( + Date_t when, const CallbackFn& work) { + if (when <= now()) { + return scheduleWork(work); + } + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, work, when); + if (!cbHandle.isOK()) { + return cbHandle; + } + _net->setAlarm(when, + [this, when, cbHandle] { + auto cbState = + checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue())); + if (cbState->canceled.load()) { + return; + } + invariant(now() >= when); + stdx::lock_guard<stdx::mutex> lk(_mutex); + scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter); + }); + + return cbHandle; +} + +namespace { +void remoteCommandFinished(const TaskExecutor::CallbackArgs& cbData, + const TaskExecutor::RemoteCommandCallbackFn& cb, + const RemoteCommandRequest& request, + const TaskExecutor::ResponseStatus& response) { + using ResponseStatus = TaskExecutor::ResponseStatus; + if (cbData.status.isOK()) { + cb(TaskExecutor::RemoteCommandCallbackArgs( + cbData.executor, cbData.myHandle, request, response)); + } else { + cb(TaskExecutor::RemoteCommandCallbackArgs( + cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status))); + } +} + +void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData, + const TaskExecutor::RemoteCommandCallbackFn& cb, + const RemoteCommandRequest& request) { + using ResponseStatus = TaskExecutor::ResponseStatus; + invariant(!cbData.status.isOK()); + cb(TaskExecutor::RemoteCommandCallbackArgs( + cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status))); +} +} // namespace + +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand( + const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + RemoteCommandRequest scheduledRequest = request; + if (request.timeout == RemoteCommandRequest::kNoTimeout) { + scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate; + } else { + scheduledRequest.expirationDate = _net->now() + scheduledRequest.timeout; + } + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto cbHandle = + enqueueCallbackState_inlock(&_networkInProgressQueue, + [scheduledRequest, cb](const CallbackArgs& cbData) { + remoteCommandFailedEarly(cbData, cb, scheduledRequest); + }); + if (!cbHandle.isOK()) + return cbHandle; + const auto& cbState = _networkInProgressQueue.back(); + cbState->isNetworkOperation = true; + LOG(4) << "Scheduling remote command request: " << scheduledRequest.toString(); + _net->startCommand(cbHandle.getValue(), + scheduledRequest, + [this, scheduledRequest, cbState, cb](const ResponseStatus& response) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_inShutdown) { + return; + } + LOG(3) << "Received remote response: " + << (response.isOK() ? response.getValue().toString() + : response.getStatus().toString()); + cbState->callback = + [cb, scheduledRequest, response](const CallbackArgs& cbData) { + remoteCommandFinished(cbData, cb, scheduledRequest, response); + }; + scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter); + }); + return cbHandle; +} + +void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) { + invariant(cbHandle.isValid()); + auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle)); + stdx::unique_lock<stdx::mutex> lk(_mutex); + cbState->canceled.store(1); + if (cbState->isNetworkOperation) { + lk.unlock(); + _net->cancelCommand(cbHandle); + return; + } + if (cbState->readyDate != Date_t{}) { + // This callback might still be in the sleeper queue; if it is, schedule it now + // rather than when the alarm fires. + auto iter = std::find_if(_sleepersQueue.begin(), + _sleepersQueue.end(), + [cbState](const std::shared_ptr<CallbackState>& other) { + return cbState == other.get(); + }); + if (iter != _sleepersQueue.end()) { + invariant(iter == cbState->iter); + scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter); + } + } +} + +void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle) { + invariant(cbHandle.isValid()); + auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle)); + waitForEvent(cbState->finishedEvent); +} + +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock( + WorkQueue* queue, CallbackFn work, Date_t when) { + auto event = makeEvent_inlock(); + if (!event.isOK()) { + return event.getStatus(); + } + queue->emplace_back(CallbackState::make(std::move(work), std::move(event.getValue()), when)); + queue->back()->iter = std::prev(queue->end()); + CallbackHandle cbHandle; + setCallbackForHandle(&cbHandle, queue->back()); + return cbHandle; +} + +StatusWith<ThreadPoolTaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent_inlock() { + if (_inShutdown) { + return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + } + _unsignaledEvents.emplace_front(EventState::make()); + _unsignaledEvents.front()->iter = _unsignaledEvents.begin(); + EventHandle event; + setEventForHandle(&event, _unsignaledEvents.front()); + return event; +} + +void ThreadPoolTaskExecutor::signalEvent_inlock(const EventHandle& event) { + invariant(event.isValid()); + auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); + invariant(!eventState->isSignaledFlag); + eventState->isSignaledFlag = true; + eventState->isSignaledCondition.notify_all(); + scheduleIntoPool_inlock(&eventState->waiters); + _unsignaledEvents.erase(eventState->iter); +} + +void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue) { + scheduleIntoPool_inlock(fromQueue, fromQueue->begin(), fromQueue->end()); +} + +void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, + const WorkQueue::iterator& iter) { + scheduleIntoPool_inlock(fromQueue, iter, std::next(iter)); +} + +void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, + const WorkQueue::iterator& begin, + const WorkQueue::iterator& end) { + dassert(fromQueue != &_poolInProgressQueue); + std::for_each( + begin, + end, + [this](const std::shared_ptr<CallbackState>& cbState) { + fassert(28735, _pool->schedule([this, cbState] { runCallback(std::move(cbState)); })); + }); + _poolInProgressQueue.splice(_poolInProgressQueue.end(), *fromQueue, begin, end); + _net->signalWorkAvailable(); +} + +void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateArg) { + auto cbStatePtr = cbStateArg.get(); + CallbackHandle cbHandle; + setCallbackForHandle(&cbHandle, std::move(cbStateArg)); + CallbackArgs args(this, + std::move(cbHandle), + cbStatePtr->canceled.load() + ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) + : Status::OK()); + cbStatePtr->callback(std::move(args)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (cbStatePtr->finishedEvent.isValid()) { + signalEvent_inlock(cbStatePtr->finishedEvent); + } + _poolInProgressQueue.erase(cbStatePtr->iter); +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h new file mode 100644 index 00000000000..af57caa2fb0 --- /dev/null +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -0,0 +1,158 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/base/disallow_copying.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" + +namespace mongo { + +class ThreadPoolInterface; + +namespace executor { + +class NetworkInterface; + +/** + * Implementation of a TaskExecutor that uses a pool of threads to execute work items. + */ +class ThreadPoolTaskExecutor final : public TaskExecutor { + MONGO_DISALLOW_COPYING(ThreadPoolTaskExecutor); + +public: + /** + * Constructs an instance of ThreadPoolTaskExecutor that runs tasks in "pool" and uses "net" + * for network operations. + */ + ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool, + std::unique_ptr<NetworkInterface> net); + + /** + * Destroys a ThreadPoolTaskExecutor. + */ + ~ThreadPoolTaskExecutor(); + + void startup() override; + void shutdown() override; + void join() override; + std::string getDiagnosticString() override; + Date_t now() override; + StatusWith<EventHandle> makeEvent() override; + void signalEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; + void waitForEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override; + void cancel(const CallbackHandle& cbHandle) override; + void wait(const CallbackHandle& cbHandle) override; + +private: + class CallbackState; + class EventState; + using WorkQueue = stdx::list<std::shared_ptr<CallbackState>>; + using EventList = stdx::list<std::shared_ptr<EventState>>; + + /** + * Creates a new callback on "queue" with the "work" function. If "when" is + * not Date_t{}, the new callback's readyDate is set to "when". + */ + StatusWith<CallbackHandle> enqueueCallbackState_inlock(WorkQueue* queue, + CallbackFn work, + Date_t when = {}); + + /** + * Makes a new event object. + */ + StatusWith<EventHandle> makeEvent_inlock(); + + /** + * Signals the given event. + */ + void signalEvent_inlock(const EventHandle& event); + + /** + * Schedules all items from "fromQueue" into the thread pool and moves them into + * _poolInProgressQueue. + */ + void scheduleIntoPool_inlock(WorkQueue* fromQueue); + + /** + * Schedules the given item from "fromQueue" into the thread pool and moves it into + * _poolInProgressQueue. + */ + void scheduleIntoPool_inlock(WorkQueue* fromQueue, const WorkQueue::iterator& iter); + + /** + * Schedules entries from "begin" through "end" in "fromQueue" into the thread pool + * and moves them into _poolInProgressQueue. + */ + void scheduleIntoPool_inlock(WorkQueue* fromQueue, + const WorkQueue::iterator& begin, + const WorkQueue::iterator& end); + + /** + * Executes the callback specified by "cbState". + */ + void runCallback(std::shared_ptr<CallbackState> cbState); + + // The network interface used for remote command execution and waiting. + std::unique_ptr<NetworkInterface> _net; + + // The thread pool that executes scheduled work items. + std::unique_ptr<ThreadPoolInterface> _pool; + + // Mutex guarding all remaining fields. + stdx::mutex _mutex; + + // Queue containing all items currently scheduled into the thread pool but not yet completed. + WorkQueue _poolInProgressQueue; + + // Queue containing all items currently scheduled into the network interface. + WorkQueue _networkInProgressQueue; + + // Queue containing all items waiting for a particular point in time to execute. + WorkQueue _sleepersQueue; + + // List of all events that have yet to be signaled. + EventList _unsignaledEvents; + + // Indicates whether or not the executor is shutting down. + bool _inShutdown = false; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp new file mode 100644 index 00000000000..63e8e142f3d --- /dev/null +++ b/src/mongo/executor/thread_pool_task_executor_test.cpp @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/base/checked_cast.h" +#include "mongo/base/init.h" +#include "mongo/base/status.h" +#include "mongo/executor/network_interface.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/task_executor_test_common.h" +#include "mongo/executor/task_executor_test_fixture.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" +#include "mongo/executor/thread_pool_mock.h" + +namespace mongo { +namespace executor { +namespace { + +std::unique_ptr<ThreadPoolTaskExecutor> makeThreadPoolTestExecutor( + std::unique_ptr<NetworkInterface> net) { + auto netPtr = checked_cast<NetworkInterfaceMock*>(net.get()); + return stdx::make_unique<ThreadPoolTaskExecutor>(stdx::make_unique<ThreadPoolMock>(netPtr, 1), + std::move(net)); +} + +MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) { + addTestsForExecutor("ThreadPoolExecutorCommon", + [](std::unique_ptr<NetworkInterface>* net) { + return makeThreadPoolTestExecutor(std::move(*net)); + }); + return Status::OK(); +} + +class ThreadPoolExecutorTest : public TaskExecutorTest { +private: + std::unique_ptr<TaskExecutor> makeTaskExecutor(std::unique_ptr<NetworkInterface> net) override { + return makeThreadPoolTestExecutor(std::move(net)); + } +}; + +void setStatus(const TaskExecutor::CallbackArgs& cbData, Status* outStatus) { + *outStatus = cbData.status; +} + +TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) { + auto net = getNet(); + auto& executor = getExecutor(); + launchExecutorThread(); + auto status1 = getDetectableErrorStatus(); + const auto now = net->now(); + const auto cb1 = unittest::assertGet(executor.scheduleWorkAt( + now + Milliseconds(5000), stdx::bind(setStatus, stdx::placeholders::_1, &status1))); + + const auto startTime = net->now(); + net->enterNetwork(); + net->runUntil(startTime + Milliseconds(200)); + net->exitNetwork(); + executor.cancel(cb1); + executor.wait(cb1); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status1); + ASSERT_EQUALS(startTime + Milliseconds(200), net->now()); + executor.shutdown(); + joinExecutorThread(); +} + +} // namespace +} // namespace executor +} // namespace mongo |