diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2019-03-05 13:40:25 -0500 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2019-04-05 14:28:40 -0400 |
commit | a08bac3e29cd9ec3d030623894928e80c2f572dd (patch) | |
tree | a82304a07e67909a8d8a40ec08959f4f1c8ccace /src/mongo/executor/thread_pool_mock.cpp | |
parent | b1aa248f71ffb197c6576fd90ff7571ee9a96c3f (diff) | |
download | mongo-a08bac3e29cd9ec3d030623894928e80c2f572dd.tar.gz |
SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)
Diffstat (limited to 'src/mongo/executor/thread_pool_mock.cpp')
-rw-r--r-- | src/mongo/executor/thread_pool_mock.cpp | 125 |
1 files changed, 65 insertions, 60 deletions
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp index a2ffbbac2ba..191537cebff 100644 --- a/src/mongo/executor/thread_pool_mock.cpp +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -44,19 +44,11 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti ThreadPoolMock::~ThreadPoolMock() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _inShutdown = true; - _net->signalWorkAvailable(); - _net->exitNetwork(); - if (_started) { - if (_worker.joinable()) { - lk.unlock(); - _worker.join(); - lk.lock(); - } - } else { - consumeTasks(&lk); - } - invariant(_tasks.empty()); + if (_joining) + return; + + _shutdown(lk); + _join(lk); } void ThreadPoolMock::startup() { @@ -68,74 +60,87 @@ void ThreadPoolMock::startup() { _worker = stdx::thread([this] { _options.onCreateThread(); stdx::unique_lock<stdx::mutex> lk(_mutex); - consumeTasks(&lk); + + LOG(1) << "Starting to consume tasks"; + while (!_joining) { + if (_tasks.empty()) { + lk.unlock(); + _net->waitForWork(); + lk.lock(); + continue; + } + + _consumeOneTask(lk); + } + LOG(1) << "Done consuming tasks"; }); } void ThreadPoolMock::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inShutdown = true; - _net->signalWorkAvailable(); + stdx::unique_lock<stdx::mutex> lk(_mutex); + _shutdown(lk); } void ThreadPoolMock::join() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _joining = true; - if (_started) { - stdx::thread toJoin = std::move(_worker); - _net->signalWorkAvailable(); - _net->exitNetwork(); + _join(lk); +} + +void ThreadPoolMock::schedule(Task task) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (_inShutdown) { lk.unlock(); - toJoin.join(); - lk.lock(); - invariant(_tasks.empty()); - } else { - consumeTasks(&lk); - invariant(_tasks.empty()); + + task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); + return; } + + _tasks.emplace_back(std::move(task)); } -Status ThreadPoolMock::schedule(Task task) { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) { + auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); + if (next + 1 != _tasks.size()) { + std::swap(_tasks[next], _tasks.back()); + } + Task fn = std::move(_tasks.back()); + _tasks.pop_back(); + lk.unlock(); if (_inShutdown) { - return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); + } else { + fn(Status::OK()); } - _tasks.emplace_back(std::move(task)); - return Status::OK(); + lk.lock(); } -void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) { - using std::swap; +void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) { + LOG(1) << "Shutting down pool"; - LOG(1) << "Starting to consume tasks"; - while (!(_inShutdown && _tasks.empty())) { - if (_tasks.empty()) { - lk->unlock(); - _net->waitForWork(); - lk->lock(); - continue; - } - auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); - if (next + 1 != _tasks.size()) { - swap(_tasks[next], _tasks.back()); - } - Task fn = std::move(_tasks.back()); - _tasks.pop_back(); - lk->unlock(); - fn(); - lk->lock(); - } - LOG(1) << "Done consuming tasks"; + _inShutdown = true; + _net->signalWorkAvailable(); +} - invariant(_tasks.empty()); +void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) { + LOG(1) << "Joining pool"; + + _joining = true; + _net->signalWorkAvailable(); + _net->exitNetwork(); - while (_started && !_joining) { - lk->unlock(); - _net->waitForWork(); - lk->lock(); + // Since there is only one worker thread, we need to consume tasks here to potentially + // unblock that thread. + while (!_tasks.empty()) { + _consumeOneTask(lk); } - LOG(1) << "Ready to join"; + if (_started) { + lk.unlock(); + _worker.join(); + lk.lock(); + } + + invariant(_tasks.empty()); } } // namespace executor |