summaryrefslogtreecommitdiff
path: root/src/mongo/executor/thread_pool_mock.cpp
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-03-05 13:40:25 -0500
committerBen Caimano <ben.caimano@10gen.com>2019-04-05 14:28:40 -0400
commita08bac3e29cd9ec3d030623894928e80c2f572dd (patch)
treea82304a07e67909a8d8a40ec08959f4f1c8ccace /src/mongo/executor/thread_pool_mock.cpp
parentb1aa248f71ffb197c6576fd90ff7571ee9a96c3f (diff)
downloadmongo-a08bac3e29cd9ec3d030623894928e80c2f572dd.tar.gz
SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)
Diffstat (limited to 'src/mongo/executor/thread_pool_mock.cpp')
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp125
1 files changed, 65 insertions, 60 deletions
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
index a2ffbbac2ba..191537cebff 100644
--- a/src/mongo/executor/thread_pool_mock.cpp
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -44,19 +44,11 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti
ThreadPoolMock::~ThreadPoolMock() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _net->signalWorkAvailable();
- _net->exitNetwork();
- if (_started) {
- if (_worker.joinable()) {
- lk.unlock();
- _worker.join();
- lk.lock();
- }
- } else {
- consumeTasks(&lk);
- }
- invariant(_tasks.empty());
+ if (_joining)
+ return;
+
+ _shutdown(lk);
+ _join(lk);
}
void ThreadPoolMock::startup() {
@@ -68,74 +60,87 @@ void ThreadPoolMock::startup() {
_worker = stdx::thread([this] {
_options.onCreateThread();
stdx::unique_lock<stdx::mutex> lk(_mutex);
- consumeTasks(&lk);
+
+ LOG(1) << "Starting to consume tasks";
+ while (!_joining) {
+ if (_tasks.empty()) {
+ lk.unlock();
+ _net->waitForWork();
+ lk.lock();
+ continue;
+ }
+
+ _consumeOneTask(lk);
+ }
+ LOG(1) << "Done consuming tasks";
});
}
void ThreadPoolMock::shutdown() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _net->signalWorkAvailable();
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _shutdown(lk);
}
void ThreadPoolMock::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _joining = true;
- if (_started) {
- stdx::thread toJoin = std::move(_worker);
- _net->signalWorkAvailable();
- _net->exitNetwork();
+ _join(lk);
+}
+
+void ThreadPoolMock::schedule(Task task) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (_inShutdown) {
lk.unlock();
- toJoin.join();
- lk.lock();
- invariant(_tasks.empty());
- } else {
- consumeTasks(&lk);
- invariant(_tasks.empty());
+
+ task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
+ return;
}
+
+ _tasks.emplace_back(std::move(task));
}
-Status ThreadPoolMock::schedule(Task task) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) {
+ auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
+ if (next + 1 != _tasks.size()) {
+ std::swap(_tasks[next], _tasks.back());
+ }
+ Task fn = std::move(_tasks.back());
+ _tasks.pop_back();
+ lk.unlock();
if (_inShutdown) {
- return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
+ } else {
+ fn(Status::OK());
}
- _tasks.emplace_back(std::move(task));
- return Status::OK();
+ lk.lock();
}
-void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) {
- using std::swap;
+void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) {
+ LOG(1) << "Shutting down pool";
- LOG(1) << "Starting to consume tasks";
- while (!(_inShutdown && _tasks.empty())) {
- if (_tasks.empty()) {
- lk->unlock();
- _net->waitForWork();
- lk->lock();
- continue;
- }
- auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
- if (next + 1 != _tasks.size()) {
- swap(_tasks[next], _tasks.back());
- }
- Task fn = std::move(_tasks.back());
- _tasks.pop_back();
- lk->unlock();
- fn();
- lk->lock();
- }
- LOG(1) << "Done consuming tasks";
+ _inShutdown = true;
+ _net->signalWorkAvailable();
+}
- invariant(_tasks.empty());
+void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) {
+ LOG(1) << "Joining pool";
+
+ _joining = true;
+ _net->signalWorkAvailable();
+ _net->exitNetwork();
- while (_started && !_joining) {
- lk->unlock();
- _net->waitForWork();
- lk->lock();
+ // Since there is only one worker thread, we need to consume tasks here to potentially
+ // unblock that thread.
+ while (!_tasks.empty()) {
+ _consumeOneTask(lk);
}
- LOG(1) << "Ready to join";
+ if (_started) {
+ lk.unlock();
+ _worker.join();
+ lk.lock();
+ }
+
+ invariant(_tasks.empty());
}
} // namespace executor