/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kExecutor #include "mongo/platform/basic.h" #include "mongo/executor/thread_pool_mock.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/util/log.h" namespace mongo { namespace executor { ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Options options) : _options(std::move(options)), _prng(prngSeed), _net(net) {} ThreadPoolMock::~ThreadPoolMock() { stdx::unique_lock lk(_mutex); _inShutdown = true; _net->signalWorkAvailable(); _net->exitNetwork(); if (_started) { if (_worker.joinable()) { lk.unlock(); _worker.join(); lk.lock(); } } else { consumeTasks(&lk); } invariant(_tasks.empty()); } void ThreadPoolMock::startup() { LOG(1) << "Starting pool"; stdx::lock_guard lk(_mutex); invariant(!_started); invariant(!_worker.joinable()); _started = true; _worker = stdx::thread([this] { _options.onCreateThread(); stdx::unique_lock lk(_mutex); consumeTasks(&lk); }); } void ThreadPoolMock::shutdown() { stdx::lock_guard lk(_mutex); _inShutdown = true; _net->signalWorkAvailable(); } void ThreadPoolMock::join() { stdx::unique_lock lk(_mutex); _joining = true; if (_started) { stdx::thread toJoin = std::move(_worker); _net->signalWorkAvailable(); _net->exitNetwork(); lk.unlock(); toJoin.join(); lk.lock(); invariant(_tasks.empty()); } else { consumeTasks(&lk); invariant(_tasks.empty()); } } Status ThreadPoolMock::schedule(Task task) { stdx::lock_guard lk(_mutex); if (_inShutdown) { return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } _tasks.emplace_back(std::move(task)); return Status::OK(); } void ThreadPoolMock::consumeTasks(stdx::unique_lock* lk) { using std::swap; LOG(1) << "Starting to consume tasks"; while (!(_inShutdown && _tasks.empty())) { if (_tasks.empty()) { lk->unlock(); _net->waitForWork(); lk->lock(); continue; } auto next = static_cast(_prng.nextInt64(static_cast(_tasks.size()))); if (next + 1 != _tasks.size()) { swap(_tasks[next], _tasks.back()); } Task fn = std::move(_tasks.back()); _tasks.pop_back(); lk->unlock(); fn(); lk->lock(); } LOG(1) << "Done consuming tasks"; invariant(_tasks.empty()); while (_started && !_joining) { lk->unlock(); _net->waitForWork(); lk->lock(); } LOG(1) << "Ready to join"; } } // namespace executor } // namespace mongo