summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2015-07-16 09:08:43 -0500
committerAndy Schwerin <schwerin@mongodb.com>2015-07-16 10:08:43 -0400
commit48f79b30ff75c6310869ea8d0e34925d5f63252f (patch)
treecf429b96eb779439d198a9ffe6c119e469aa97e6 /src
parentfc7860846700e328c95263132bde4b603dd635c1 (diff)
downloadmongo-48f79b30ff75c6310869ea8d0e34925d5f63252f.tar.gz
SERVER-19001 Implementation of ThreadPoolTaskExecutor and basic tests.
In order to support deterministic unit testing, it was also necessary to introduce a mock implementation of ThreadPool, executor::ThreadPoolMock, that is tightly integrated with NetworkInterfaceMock to allow for deterministic unit testing of things that use TaskExecutors. To keep the ThreadPoolTaskExecutor from having to keep a dedicated thread for handling scheduleAt(Date_t, ...) processing, a new method, setAlarm, is introduced to NetworkInterface. setAlarm offers an extremely relaxed contract, to maximize the number of legal implementations.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/executor/SConscript46
-rw-r--r--src/mongo/executor/network_interface.h8
-rw-r--r--src/mongo/executor/network_interface_asio.cpp4
-rw-r--r--src/mongo/executor/network_interface_asio.h1
-rw-r--r--src/mongo/executor/network_interface_impl.cpp32
-rw-r--r--src/mongo/executor/network_interface_impl.h57
-rw-r--r--src/mongo/executor/network_interface_mock.cpp20
-rw-r--r--src/mongo/executor/network_interface_mock.h23
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp134
-rw-r--r--src/mongo/executor/thread_pool_mock.h78
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp420
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h158
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test.cpp96
13 files changed, 1051 insertions, 26 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index a5d409c6eac..6083b988bf5 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -25,7 +25,10 @@ env.Library(target='network_interface_impl', # TODO: rename to thread_pool_netwo
])
env.Library('network_interface_mock',
- 'network_interface_mock.cpp',
+ [
+ 'network_interface_mock.cpp',
+ 'thread_pool_mock.cpp',
+ ],
LIBDEPS=[
'network_interface',
])
@@ -66,11 +69,36 @@ env.Library(
'network_interface_impl',
])
-env.Library('task_executor_test_fixture',
- [
- 'task_executor_test_common.cpp',
- 'task_executor_test_fixture.cpp'],
- LIBDEPS=[
- 'network_interface_mock',
- 'task_executor_interface',
- ])
+env.Library(
+ target='task_executor_test_fixture',
+ source=[
+ 'task_executor_test_common.cpp',
+ 'task_executor_test_fixture.cpp'
+ ],
+ LIBDEPS=[
+ 'network_interface_mock',
+ 'task_executor_interface',
+ ]
+)
+
+env.Library(
+ target='thread_pool_task_executor',
+ source=[
+ 'thread_pool_task_executor.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool'
+ ]
+)
+
+env.CppUnitTest(
+ target='thread_pool_task_executor_test',
+ source=[
+ 'thread_pool_task_executor_test.cpp',
+ ],
+ LIBDEPS=[
+ 'thread_pool_task_executor',
+ 'task_executor_test_fixture',
+ '$BUILD_DIR/mongo/client/remote_command_runner',
+ ]
+)
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 2d4f8bdfcb3..658b0f23d14 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -116,6 +116,14 @@ public:
*/
virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0;
+ /**
+ * Sets an alarm, which schedules "action" to run no sooner than "when".
+ *
+ * "action" should not do anything that requires a lot of computation, or that might block for a
+ * long time, as it may execute in a network thread.
+ */
+ virtual void setAlarm(Date_t when, const stdx::function<void()>& action) = 0;
+
protected:
NetworkInterface();
};
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index b9d7ab72a53..9163a68dd74 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -141,6 +141,10 @@ void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbH
}
}
+void NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& action) {
+ MONGO_UNREACHABLE;
+};
+
bool NetworkInterfaceASIO::inShutdown() const {
return (_state.load() == State::kShutdown);
}
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index 28955bda89e..eb18aecf38d 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -68,6 +68,7 @@ public:
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish) override;
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
+ void setAlarm(Date_t when, const stdx::function<void()>& action) override;
bool inShutdown() const;
diff --git a/src/mongo/executor/network_interface_impl.cpp b/src/mongo/executor/network_interface_impl.cpp
index 7fd876d80b6..c37a371ffe5 100644
--- a/src/mongo/executor/network_interface_impl.cpp
+++ b/src/mongo/executor/network_interface_impl.cpp
@@ -47,8 +47,8 @@ namespace executor {
namespace {
-const size_t kMinThreads = 1;
-const size_t kMaxThreads = 51; // Set to 1 + max repl set size, for heartbeat + wiggle room.
+const size_t kMinThreads = 2;
+const size_t kMaxThreads = 52; // Set to 1 + max repl set size, for heartbeat + wiggle room.
const Seconds kMaxIdleThreadAge(30);
ThreadPool::Options makeOptions() {
@@ -84,6 +84,7 @@ void NetworkInterfaceImpl::startup() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(!_inShutdown);
_pool.startup();
+ fassert(27824, _pool.schedule([this]() { _processAlarms(); }));
}
void NetworkInterfaceImpl::shutdown() {
@@ -91,6 +92,7 @@ void NetworkInterfaceImpl::shutdown() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_inShutdown = true;
_hasPending.notify_all();
+ _newAlarmReady.notify_all();
_pool.shutdown();
lk.unlock();
_commandRunner.shutdown();
@@ -190,5 +192,31 @@ std::string NetworkInterfaceImpl::getHostName() {
return getHostNameCached();
}
+void NetworkInterfaceImpl::setAlarm(Date_t when, const stdx::function<void()>& action) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ const bool notify = _alarms.empty() || _alarms.top().when > when;
+ _alarms.emplace(when, action);
+ if (notify) {
+ _newAlarmReady.notify_all();
+ }
+}
+
+void NetworkInterfaceImpl::_processAlarms() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_inShutdown) {
+ if (_alarms.empty()) {
+ _newAlarmReady.wait(lk);
+ } else if (now() < _alarms.top().when) {
+ _newAlarmReady.wait_until(lk, _alarms.top().when.toSystemTimePoint());
+ } else {
+ auto action = _alarms.top().action;
+ _alarms.pop();
+ lk.unlock();
+ action();
+ lk.lock();
+ }
+ }
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_impl.h b/src/mongo/executor/network_interface_impl.h
index 6180e6526b2..cf9af82cca8 100644
--- a/src/mongo/executor/network_interface_impl.h
+++ b/src/mongo/executor/network_interface_impl.h
@@ -29,6 +29,8 @@
#pragma once
+#include <queue>
+#include <utility>
#include <vector>
#include "mongo/client/remote_command_runner_impl.h"
@@ -67,25 +69,41 @@ namespace executor {
* after they have been connected for a certain maximum period.
* TODO(spencer): Rename this to ThreadPoolNetworkInterface
*/
-class NetworkInterfaceImpl : public NetworkInterface {
+class NetworkInterfaceImpl final : public NetworkInterface {
public:
NetworkInterfaceImpl();
- virtual ~NetworkInterfaceImpl();
- virtual std::string getDiagnosticString();
- virtual void startup();
- virtual void shutdown();
- virtual void waitForWork();
- virtual void waitForWorkUntil(Date_t when);
- virtual void signalWorkAvailable();
- virtual Date_t now();
- virtual std::string getHostName();
- virtual void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish);
- virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
+ ~NetworkInterfaceImpl();
+ std::string getDiagnosticString() override;
+ void startup() override;
+ void shutdown() override;
+ void waitForWork() override;
+ void waitForWorkUntil(Date_t when) override;
+ void signalWorkAvailable() override;
+ Date_t now() override;
+ std::string getHostName() override;
+ void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const RemoteCommandRequest& request,
+ const RemoteCommandCompletionFn& onFinish) override;
+ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
+ void setAlarm(Date_t when, const stdx::function<void()>& action) override;
private:
/**
+ * Information describing a scheduled alarm.
+ */
+ struct AlarmInfo {
+ using AlarmAction = stdx::function<void()>;
+ AlarmInfo(Date_t inWhen, AlarmAction inAction)
+ : when(inWhen), action(std::move(inAction)) {}
+ bool operator>(const AlarmInfo& rhs) const {
+ return when > rhs.when;
+ }
+
+ Date_t when;
+ AlarmAction action;
+ };
+
+ /**
* Information describing an in-flight command.
*/
struct CommandData {
@@ -101,6 +119,11 @@ private:
void _runOneCommand();
/**
+ * Worker function that processes alarms set via setAlarm.
+ */
+ void _processAlarms();
+
+ /**
* Notifies the network threads that there is work available.
*/
void _signalWorkAvailable_inlock();
@@ -133,6 +156,12 @@ private:
// Number of active network requests
size_t _numActiveNetworkRequests = 0;
+
+ // Condition variable to signal in order to wake up the alarm processing thread.
+ stdx::condition_variable _newAlarmReady;
+
+ // Heap of alarms, with the next alarm always on top.
+ std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms;
};
} // namespace executor
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 1553fc4287a..d2f60814016 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -114,6 +114,16 @@ void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbH
// No not-in-progress network command matched cbHandle. Oh, well.
}
+void NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (when <= _now_inlock()) {
+ lk.unlock();
+ action();
+ return;
+ }
+ _alarms.emplace(when, action);
+}
+
void NetworkInterfaceMock::startup() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(!_hasStarted);
@@ -242,6 +252,9 @@ void NetworkInterfaceMock::runUntil(Date_t until) {
break;
}
Date_t newNow = _executorNextWakeupDate;
+ if (!_alarms.empty() && _alarms.top().when < newNow) {
+ newNow = _alarms.top().when;
+ }
if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) {
newNow = _scheduled.front().getResponseDate();
}
@@ -286,6 +299,13 @@ void NetworkInterfaceMock::signalWorkAvailable() {
}
void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) {
+ while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) {
+ auto fn = _alarms.top().action;
+ _alarms.pop();
+ lk->unlock();
+ fn();
+ lk->lock();
+ }
while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) {
invariant(_currentlyRunning == kNetworkThread);
NetworkOperation op = _scheduled.front();
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 00d8487cf8c..92cad82c978 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -28,7 +28,9 @@
#pragma once
-#include <map>
+#include <queue>
+#include <utility>
+#include <vector>
#include "mongo/executor/network_interface.h"
#include "mongo/stdx/condition_variable.h"
@@ -87,6 +89,7 @@ public:
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish);
virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
+ virtual void setAlarm(Date_t when, const stdx::function<void()>& action);
////////////////////////////////////////////////////////////////////////////////
@@ -165,6 +168,21 @@ public:
private:
/**
+ * Information describing a scheduled alarm.
+ */
+ struct AlarmInfo {
+ using AlarmAction = stdx::function<void()>;
+ AlarmInfo(Date_t inWhen, AlarmAction inAction)
+ : when(inWhen), action(std::move(inAction)) {}
+ bool operator>(const AlarmInfo& rhs) const {
+ return when > rhs.when;
+ }
+
+ Date_t when;
+ AlarmAction action;
+ };
+
+ /**
* Type used to identify which thread (network mock or executor) is currently executing.
*
* Values are used in a bitmask, as well.
@@ -251,6 +269,9 @@ private:
// List of network operations that will not be responded to until shutdown() is called.
NetworkOperationList _blackHoled; // (M)
+
+ // Heap of alarms, with the next alarm always on top.
+ std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms; // (M)
};
/**
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
new file mode 100644
index 00000000000..ab7c9ddfda4
--- /dev/null
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -0,0 +1,134 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kExecutor
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/thread_pool_mock.h"
+
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace executor {
+
+ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed)
+ : _prng(prngSeed), _net(net) {}
+
+ThreadPoolMock::~ThreadPoolMock() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ _net->signalWorkAvailable();
+ if (_started) {
+ if (_worker.joinable()) {
+ lk.unlock();
+ _worker.join();
+ lk.lock();
+ }
+ } else {
+ consumeTasks(&lk);
+ }
+ invariant(_tasks.empty());
+}
+
+void ThreadPoolMock::startup() {
+ log() << "Starting pool";
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_started);
+ invariant(!_worker.joinable());
+ _started = true;
+ _worker = stdx::thread([this] {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ consumeTasks(&lk);
+ });
+}
+
+void ThreadPoolMock::shutdown() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ _net->signalWorkAvailable();
+}
+
+void ThreadPoolMock::join() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _joining = true;
+ if (_started) {
+ stdx::thread toJoin = std::move(_worker);
+ _net->signalWorkAvailable();
+ lk.unlock();
+ toJoin.join();
+ lk.lock();
+ invariant(_tasks.empty());
+ } else {
+ consumeTasks(&lk);
+ invariant(_tasks.empty());
+ }
+}
+
+Status ThreadPoolMock::schedule(Task task) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_inShutdown) {
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ }
+ _tasks.emplace_back(std::move(task));
+ return Status::OK();
+}
+
+void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) {
+ using std::swap;
+ log() << "Starting to consume tasks";
+ while (!(_inShutdown && _tasks.empty())) {
+ if (_tasks.empty()) {
+ lk->unlock();
+ _net->waitForWork();
+ lk->lock();
+ continue;
+ }
+ auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
+ if (next + 1 != _tasks.size()) {
+ swap(_tasks[next], _tasks.back());
+ }
+ Task fn = std::move(_tasks.back());
+ _tasks.pop_back();
+ lk->unlock();
+ fn();
+ lk->lock();
+ }
+ log() << "Done consuming tasks";
+ invariant(_tasks.empty());
+ while (!_joining) {
+ lk->unlock();
+ _net->waitForWork();
+ lk->lock();
+ }
+ log() << "Ready to join";
+}
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h
new file mode 100644
index 00000000000..55e2607e53b
--- /dev/null
+++ b/src/mongo/executor/thread_pool_mock.h
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "mongo/platform/random.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/concurrency/thread_pool_interface.h"
+
+namespace mongo {
+namespace executor {
+
+class NetworkInterfaceMock;
+
+/**
+ * Implementation of a thread pool that is tightly integrated with NetworkInterfaceMock to allow for
+ * deterministic unit testing of ThreadPoolTaskExecutor. This pool has a single thread, which only
+ * executes jobs when the NetworkInterfaceMock allows it to (i.e., not when the NetworkInterfaceMock
+ * is in the "enterNetwork" mode.
+ */
+class ThreadPoolMock final : public ThreadPoolInterface {
+public:
+ /**
+ * Create an instance that interlocks with "net". "prngSeed" seeds the pseudorandom number
+ * generator that is used to determine which schedulable task runs next.
+ */
+ ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed);
+ ~ThreadPoolMock();
+
+ void startup() override;
+ void shutdown() override;
+ void join() override;
+ Status schedule(Task task) override;
+
+private:
+ void consumeTasks(stdx::unique_lock<stdx::mutex>* lk);
+
+ stdx::mutex _mutex;
+ stdx::thread _worker;
+ std::vector<Task> _tasks;
+ PseudoRandom _prng;
+ NetworkInterfaceMock* const _net;
+ bool _started = false;
+ bool _inShutdown = false;
+ bool _joining = false;
+};
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
new file mode 100644
index 00000000000..86bce4061e7
--- /dev/null
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -0,0 +1,420 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kExecutor
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/thread_pool_task_executor.h"
+
+#include <iterator>
+
+#include "mongo/base/checked_cast.h"
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/util/concurrency/thread_pool_interface.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace executor {
+
+class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState {
+ MONGO_DISALLOW_COPYING(CallbackState);
+
+public:
+ static std::shared_ptr<CallbackState> make(CallbackFn cb,
+ EventHandle finishedEvent,
+ Date_t readyDate) {
+ return std::make_shared<CallbackState>(std::move(cb), std::move(finishedEvent), readyDate);
+ }
+
+ /**
+ * Do not call directly. Use make.
+ */
+ CallbackState(CallbackFn cb, EventHandle theFinishedEvent, Date_t theReadyDate)
+ : callback(std::move(cb)),
+ finishedEvent(std::move(theFinishedEvent)),
+ readyDate(theReadyDate) {}
+
+ virtual ~CallbackState() = default;
+
+ void cancel() override {
+ MONGO_UNREACHABLE;
+ }
+
+ void waitForCompletion() override {
+ MONGO_UNREACHABLE;
+ }
+
+ // All fields except for "canceled" are guarded by the owning task executor's _mutex. The
+ // "canceled" field may be observed without holding _mutex, but may only be set while holding
+ // _mutex.
+
+ CallbackFn callback;
+ EventHandle finishedEvent;
+ AtomicUInt32 canceled{0U};
+ WorkQueue::iterator iter;
+ Date_t readyDate;
+ bool isNetworkOperation = false;
+};
+
+class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState {
+ MONGO_DISALLOW_COPYING(EventState);
+
+public:
+ static std::shared_ptr<EventState> make() {
+ return std::make_shared<EventState>();
+ }
+
+ EventState() = default;
+
+ void signal() override {
+ MONGO_UNREACHABLE;
+ }
+ void waitUntilSignaled() override {
+ MONGO_UNREACHABLE;
+ }
+ bool isSignaled() override {
+ MONGO_UNREACHABLE;
+ }
+
+ // All fields guarded by the owning task executor's _mutex.
+
+ bool isSignaledFlag = false;
+ stdx::condition_variable isSignaledCondition;
+ EventList::iterator iter;
+ WorkQueue waiters;
+};
+
+ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool,
+ std::unique_ptr<NetworkInterface> net)
+ : _net(std::move(net)), _pool(std::move(pool)) {}
+
+ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {}
+
+void ThreadPoolTaskExecutor::startup() {
+ _net->startup();
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_inShutdown) {
+ return;
+ }
+ _pool->startup();
+}
+
+void ThreadPoolTaskExecutor::shutdown() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ WorkQueue pending;
+ pending.splice(pending.end(), _networkInProgressQueue);
+ pending.splice(pending.end(), _sleepersQueue);
+ for (auto&& eventState : _unsignaledEvents) {
+ pending.splice(pending.end(), eventState->waiters);
+ }
+ for (auto&& cbState : pending) {
+ cbState->canceled.store(1);
+ }
+ for (auto&& cbState : _poolInProgressQueue) {
+ cbState->canceled.store(1);
+ }
+ scheduleIntoPool_inlock(&pending);
+ _net->signalWorkAvailable();
+ _pool->shutdown();
+}
+
+void ThreadPoolTaskExecutor::join() {
+ _pool->join();
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_unsignaledEvents.empty()) {
+ auto eventState = _unsignaledEvents.front();
+ invariant(eventState->waiters.empty());
+ EventHandle event;
+ setEventForHandle(&event, std::move(eventState));
+ signalEvent_inlock(event);
+ }
+ lk.unlock();
+ _net->shutdown();
+ lk.lock();
+ invariant(_poolInProgressQueue.empty());
+ invariant(_networkInProgressQueue.empty());
+ invariant(_sleepersQueue.empty());
+ invariant(_unsignaledEvents.empty());
+}
+
+std::string ThreadPoolTaskExecutor::getDiagnosticString() {
+ return {};
+}
+
+Date_t ThreadPoolTaskExecutor::now() {
+ return _net->now();
+}
+
+StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return makeEvent_inlock();
+}
+
+void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ signalEvent_inlock(event);
+}
+
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const EventHandle& event,
+ const CallbackFn& work) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!event.isValid()) {
+ return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"};
+ }
+ auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
+ auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, work);
+ if (!cbHandle.isOK()) {
+ return cbHandle;
+ }
+ if (eventState->isSignaledFlag) {
+ scheduleIntoPool_inlock(&eventState->waiters);
+ }
+ return cbHandle;
+}
+
+void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
+ invariant(event.isValid());
+ auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!eventState->isSignaledFlag) {
+ eventState->isSignaledCondition.wait(lk);
+ }
+}
+
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
+ const CallbackFn& work) {
+ WorkQueue temp;
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto cbHandle = enqueueCallbackState_inlock(&temp, work);
+ if (!cbHandle.isOK()) {
+ return cbHandle;
+ }
+ scheduleIntoPool_inlock(&temp);
+ return cbHandle;
+}
+
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
+ Date_t when, const CallbackFn& work) {
+ if (when <= now()) {
+ return scheduleWork(work);
+ }
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, work, when);
+ if (!cbHandle.isOK()) {
+ return cbHandle;
+ }
+ _net->setAlarm(when,
+ [this, when, cbHandle] {
+ auto cbState =
+ checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue()));
+ if (cbState->canceled.load()) {
+ return;
+ }
+ invariant(now() >= when);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter);
+ });
+
+ return cbHandle;
+}
+
+namespace {
+void remoteCommandFinished(const TaskExecutor::CallbackArgs& cbData,
+ const TaskExecutor::RemoteCommandCallbackFn& cb,
+ const RemoteCommandRequest& request,
+ const TaskExecutor::ResponseStatus& response) {
+ using ResponseStatus = TaskExecutor::ResponseStatus;
+ if (cbData.status.isOK()) {
+ cb(TaskExecutor::RemoteCommandCallbackArgs(
+ cbData.executor, cbData.myHandle, request, response));
+ } else {
+ cb(TaskExecutor::RemoteCommandCallbackArgs(
+ cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status)));
+ }
+}
+
+void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData,
+ const TaskExecutor::RemoteCommandCallbackFn& cb,
+ const RemoteCommandRequest& request) {
+ using ResponseStatus = TaskExecutor::ResponseStatus;
+ invariant(!cbData.status.isOK());
+ cb(TaskExecutor::RemoteCommandCallbackArgs(
+ cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status)));
+}
+} // namespace
+
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand(
+ const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
+ RemoteCommandRequest scheduledRequest = request;
+ if (request.timeout == RemoteCommandRequest::kNoTimeout) {
+ scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate;
+ } else {
+ scheduledRequest.expirationDate = _net->now() + scheduledRequest.timeout;
+ }
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto cbHandle =
+ enqueueCallbackState_inlock(&_networkInProgressQueue,
+ [scheduledRequest, cb](const CallbackArgs& cbData) {
+ remoteCommandFailedEarly(cbData, cb, scheduledRequest);
+ });
+ if (!cbHandle.isOK())
+ return cbHandle;
+ const auto& cbState = _networkInProgressQueue.back();
+ cbState->isNetworkOperation = true;
+ LOG(4) << "Scheduling remote command request: " << scheduledRequest.toString();
+ _net->startCommand(cbHandle.getValue(),
+ scheduledRequest,
+ [this, scheduledRequest, cbState, cb](const ResponseStatus& response) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_inShutdown) {
+ return;
+ }
+ LOG(3) << "Received remote response: "
+ << (response.isOK() ? response.getValue().toString()
+ : response.getStatus().toString());
+ cbState->callback =
+ [cb, scheduledRequest, response](const CallbackArgs& cbData) {
+ remoteCommandFinished(cbData, cb, scheduledRequest, response);
+ };
+ scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter);
+ });
+ return cbHandle;
+}
+
+void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
+ invariant(cbHandle.isValid());
+ auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle));
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ cbState->canceled.store(1);
+ if (cbState->isNetworkOperation) {
+ lk.unlock();
+ _net->cancelCommand(cbHandle);
+ return;
+ }
+ if (cbState->readyDate != Date_t{}) {
+ // This callback might still be in the sleeper queue; if it is, schedule it now
+ // rather than when the alarm fires.
+ auto iter = std::find_if(_sleepersQueue.begin(),
+ _sleepersQueue.end(),
+ [cbState](const std::shared_ptr<CallbackState>& other) {
+ return cbState == other.get();
+ });
+ if (iter != _sleepersQueue.end()) {
+ invariant(iter == cbState->iter);
+ scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter);
+ }
+ }
+}
+
+void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle) {
+ invariant(cbHandle.isValid());
+ auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle));
+ waitForEvent(cbState->finishedEvent);
+}
+
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock(
+ WorkQueue* queue, CallbackFn work, Date_t when) {
+ auto event = makeEvent_inlock();
+ if (!event.isOK()) {
+ return event.getStatus();
+ }
+ queue->emplace_back(CallbackState::make(std::move(work), std::move(event.getValue()), when));
+ queue->back()->iter = std::prev(queue->end());
+ CallbackHandle cbHandle;
+ setCallbackForHandle(&cbHandle, queue->back());
+ return cbHandle;
+}
+
+StatusWith<ThreadPoolTaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent_inlock() {
+ if (_inShutdown) {
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ }
+ _unsignaledEvents.emplace_front(EventState::make());
+ _unsignaledEvents.front()->iter = _unsignaledEvents.begin();
+ EventHandle event;
+ setEventForHandle(&event, _unsignaledEvents.front());
+ return event;
+}
+
+void ThreadPoolTaskExecutor::signalEvent_inlock(const EventHandle& event) {
+ invariant(event.isValid());
+ auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
+ invariant(!eventState->isSignaledFlag);
+ eventState->isSignaledFlag = true;
+ eventState->isSignaledCondition.notify_all();
+ scheduleIntoPool_inlock(&eventState->waiters);
+ _unsignaledEvents.erase(eventState->iter);
+}
+
+void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue) {
+ scheduleIntoPool_inlock(fromQueue, fromQueue->begin(), fromQueue->end());
+}
+
+void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
+ const WorkQueue::iterator& iter) {
+ scheduleIntoPool_inlock(fromQueue, iter, std::next(iter));
+}
+
+void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
+ const WorkQueue::iterator& begin,
+ const WorkQueue::iterator& end) {
+ dassert(fromQueue != &_poolInProgressQueue);
+ std::for_each(
+ begin,
+ end,
+ [this](const std::shared_ptr<CallbackState>& cbState) {
+ fassert(28735, _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }));
+ });
+ _poolInProgressQueue.splice(_poolInProgressQueue.end(), *fromQueue, begin, end);
+ _net->signalWorkAvailable();
+}
+
+void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateArg) {
+ auto cbStatePtr = cbStateArg.get();
+ CallbackHandle cbHandle;
+ setCallbackForHandle(&cbHandle, std::move(cbStateArg));
+ CallbackArgs args(this,
+ std::move(cbHandle),
+ cbStatePtr->canceled.load()
+ ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
+ : Status::OK());
+ cbStatePtr->callback(std::move(args));
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (cbStatePtr->finishedEvent.isValid()) {
+ signalEvent_inlock(cbStatePtr->finishedEvent);
+ }
+ _poolInProgressQueue.erase(cbStatePtr->iter);
+}
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
new file mode 100644
index 00000000000..af57caa2fb0
--- /dev/null
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -0,0 +1,158 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/list.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+
+namespace mongo {
+
+class ThreadPoolInterface;
+
+namespace executor {
+
+class NetworkInterface;
+
+/**
+ * Implementation of a TaskExecutor that uses a pool of threads to execute work items.
+ */
+class ThreadPoolTaskExecutor final : public TaskExecutor {
+ MONGO_DISALLOW_COPYING(ThreadPoolTaskExecutor);
+
+public:
+ /**
+ * Constructs an instance of ThreadPoolTaskExecutor that runs tasks in "pool" and uses "net"
+ * for network operations.
+ */
+ ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool,
+ std::unique_ptr<NetworkInterface> net);
+
+ /**
+ * Destroys a ThreadPoolTaskExecutor.
+ */
+ ~ThreadPoolTaskExecutor();
+
+ void startup() override;
+ void shutdown() override;
+ void join() override;
+ std::string getDiagnosticString() override;
+ Date_t now() override;
+ StatusWith<EventHandle> makeEvent() override;
+ void signalEvent(const EventHandle& event) override;
+ StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override;
+ void waitForEvent(const EventHandle& event) override;
+ StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
+ StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb) override;
+ void cancel(const CallbackHandle& cbHandle) override;
+ void wait(const CallbackHandle& cbHandle) override;
+
+private:
+ class CallbackState;
+ class EventState;
+ using WorkQueue = stdx::list<std::shared_ptr<CallbackState>>;
+ using EventList = stdx::list<std::shared_ptr<EventState>>;
+
+ /**
+ * Creates a new callback on "queue" with the "work" function. If "when" is
+ * not Date_t{}, the new callback's readyDate is set to "when".
+ */
+ StatusWith<CallbackHandle> enqueueCallbackState_inlock(WorkQueue* queue,
+ CallbackFn work,
+ Date_t when = {});
+
+ /**
+ * Makes a new event object.
+ */
+ StatusWith<EventHandle> makeEvent_inlock();
+
+ /**
+ * Signals the given event.
+ */
+ void signalEvent_inlock(const EventHandle& event);
+
+ /**
+ * Schedules all items from "fromQueue" into the thread pool and moves them into
+ * _poolInProgressQueue.
+ */
+ void scheduleIntoPool_inlock(WorkQueue* fromQueue);
+
+ /**
+ * Schedules the given item from "fromQueue" into the thread pool and moves it into
+ * _poolInProgressQueue.
+ */
+ void scheduleIntoPool_inlock(WorkQueue* fromQueue, const WorkQueue::iterator& iter);
+
+ /**
+ * Schedules entries from "begin" through "end" in "fromQueue" into the thread pool
+ * and moves them into _poolInProgressQueue.
+ */
+ void scheduleIntoPool_inlock(WorkQueue* fromQueue,
+ const WorkQueue::iterator& begin,
+ const WorkQueue::iterator& end);
+
+ /**
+ * Executes the callback specified by "cbState".
+ */
+ void runCallback(std::shared_ptr<CallbackState> cbState);
+
+ // The network interface used for remote command execution and waiting.
+ std::unique_ptr<NetworkInterface> _net;
+
+ // The thread pool that executes scheduled work items.
+ std::unique_ptr<ThreadPoolInterface> _pool;
+
+ // Mutex guarding all remaining fields.
+ stdx::mutex _mutex;
+
+ // Queue containing all items currently scheduled into the thread pool but not yet completed.
+ WorkQueue _poolInProgressQueue;
+
+ // Queue containing all items currently scheduled into the network interface.
+ WorkQueue _networkInProgressQueue;
+
+ // Queue containing all items waiting for a particular point in time to execute.
+ WorkQueue _sleepersQueue;
+
+ // List of all events that have yet to be signaled.
+ EventList _unsignaledEvents;
+
+ // Indicates whether or not the executor is shutting down.
+ bool _inShutdown = false;
+};
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp
new file mode 100644
index 00000000000..63e8e142f3d
--- /dev/null
+++ b/src/mongo/executor/thread_pool_task_executor_test.cpp
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/base/checked_cast.h"
+#include "mongo/base/init.h"
+#include "mongo/base/status.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/task_executor_test_common.h"
+#include "mongo/executor/task_executor_test_fixture.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/executor/thread_pool_mock.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+std::unique_ptr<ThreadPoolTaskExecutor> makeThreadPoolTestExecutor(
+ std::unique_ptr<NetworkInterface> net) {
+ auto netPtr = checked_cast<NetworkInterfaceMock*>(net.get());
+ return stdx::make_unique<ThreadPoolTaskExecutor>(stdx::make_unique<ThreadPoolMock>(netPtr, 1),
+ std::move(net));
+}
+
+MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) {
+ addTestsForExecutor("ThreadPoolExecutorCommon",
+ [](std::unique_ptr<NetworkInterface>* net) {
+ return makeThreadPoolTestExecutor(std::move(*net));
+ });
+ return Status::OK();
+}
+
+class ThreadPoolExecutorTest : public TaskExecutorTest {
+private:
+ std::unique_ptr<TaskExecutor> makeTaskExecutor(std::unique_ptr<NetworkInterface> net) override {
+ return makeThreadPoolTestExecutor(std::move(net));
+ }
+};
+
+void setStatus(const TaskExecutor::CallbackArgs& cbData, Status* outStatus) {
+ *outStatus = cbData.status;
+}
+
+TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) {
+ auto net = getNet();
+ auto& executor = getExecutor();
+ launchExecutorThread();
+ auto status1 = getDetectableErrorStatus();
+ const auto now = net->now();
+ const auto cb1 = unittest::assertGet(executor.scheduleWorkAt(
+ now + Milliseconds(5000), stdx::bind(setStatus, stdx::placeholders::_1, &status1)));
+
+ const auto startTime = net->now();
+ net->enterNetwork();
+ net->runUntil(startTime + Milliseconds(200));
+ net->exitNetwork();
+ executor.cancel(cb1);
+ executor.wait(cb1);
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status1);
+ ASSERT_EQUALS(startTime + Milliseconds(200), net->now());
+ executor.shutdown();
+ joinExecutorThread();
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo