/* threadpool.cpp */ /* Copyright 2009 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "stdafx.h" #include "thread_pool.h" #include "mvar.h" namespace mongo{ namespace threadpool{ // Worker thread class Worker : boost::noncopyable { public: explicit Worker(ThreadPool& owner) : _owner(owner) , _is_done(true) , _thread(boost::bind(&Worker::loop, this)) {} // destructor will block until current operation is completed // Acts as a "join" on this thread ~Worker(){ _task.put(Task()); _thread.join(); } void set_task(Task& func){ assert(!func.empty()); assert(_is_done); _is_done = false; _task.put(func); } private: ThreadPool& _owner; MVar _task; bool _is_done; // only used for error detection boost::thread _thread; void loop(){ while (true) { Task task = _task.take(); if (task.empty()) break; // ends the thread try { task(); } catch (std::exception e){ log() << "Unhandled exception in worker thread: " << e.what() << endl;; } catch (...){ log() << "Unhandled non-exception in worker thread" << endl; } _is_done = true; _owner.task_done(this); } } }; ThreadPool::ThreadPool(int nThreads) : _tasksRemaining(0) , _nThreads(nThreads) { scoped_lock lock(_mutex); while (nThreads-- > 0){ Worker* worker = new Worker(*this); _freeWorkers.push_front(worker); } } ThreadPool::~ThreadPool(){ join(); assert(_tasks.empty()); // O(n) but n should be small assert(_freeWorkers.size() == (unsigned)_nThreads); while(!_freeWorkers.empty()){ delete _freeWorkers.front(); _freeWorkers.pop_front(); } } void ThreadPool::join(){ scoped_lock lock(_mutex); while(_tasksRemaining){ _condition.wait(lock.boost()); } } void ThreadPool::schedule(Task task){ scoped_lock lock(_mutex); _tasksRemaining++; if (!_freeWorkers.empty()){ _freeWorkers.front()->set_task(task); _freeWorkers.pop_front(); }else{ _tasks.push_back(task); } } // should only be called by a worker from the worker thread void ThreadPool::task_done(Worker* worker){ scoped_lock lock(_mutex); if (!_tasks.empty()){ worker->set_task(_tasks.front()); _tasks.pop_front(); }else{ _freeWorkers.push_front(worker); } _tasksRemaining--; if(_tasksRemaining == 0) _condition.notify_all(); } } //namespace threadpool } //namespace mongo