diff options
author | Mathias Stearn <mathias@10gen.com> | 2009-12-22 12:13:40 -0500 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2009-12-22 12:15:52 -0500 |
commit | a871f11644754ecc283ea601067f1cf9b4ef33ea (patch) | |
tree | edb473c3b981907b612e5360e231918fc2e3c62c /util/thread_pool.cpp | |
parent | 507622d302838c6c5e54f4863f14241e8190ad23 (diff) | |
download | mongo-a871f11644754ecc283ea601067f1cf9b4ef33ea.tar.gz |
Thread pool
Diffstat (limited to 'util/thread_pool.cpp')
-rw-r--r-- | util/thread_pool.cpp | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/util/thread_pool.cpp b/util/thread_pool.cpp new file mode 100644 index 00000000000..64720b9761e --- /dev/null +++ b/util/thread_pool.cpp @@ -0,0 +1,138 @@ +/* threadpool.cpp +*/ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../stdafx.h" +#include "thread_pool.h" +#include "util/mvar.h" + + +namespace mongo{ +namespace threadpool{ + +// Worker thread +struct Worker : boost::noncopyable{ + explicit Worker(ThreadPool& owner) + : _owner(owner) + , _is_done(true) + , _thread(boost::bind(&Worker::loop, this)) + {} + + // destructor will block until current operation is completed + // Acts as a "join" on this thread + ~Worker(){ + _task.put(Task()); + _thread.join(); + } + + void set_task(Task& func){ + assert(!func.empty()); + assert(_is_done); + _is_done = false; + + _task.put(func); + } + + private: + ThreadPool& _owner; + MVar<Task> _task; + bool _is_done; // only used for error detection + boost::thread _thread; + + void loop(){ + while (true) { + Task task = _task.take(); + if (task.empty()) + break; // ends the thread + + try { + task(); + } catch (std::exception e){ + log() << "Unhandled exception in worker thread: " << e.what() << endl;; + } catch (...){ + log() << "Unhandled non-exception in worker thread" << endl; + } + _is_done = true; + _owner.task_done(this); + } + } +}; + +ThreadPool::ThreadPool(int nThreads) + : _tasksRemaining(0) + , _nThreads(nThreads) +{ + boostlock lock(_mutex); + while (nThreads-- > 0){ + Worker* worker = new Worker(*this); + _freeWorkers.push_front(worker); + } +} + +ThreadPool::~ThreadPool(){ + join(); + + assert(_tasks.empty()); + + // O(n) but n should be small + assert(_freeWorkers.size() == (unsigned)_nThreads); + + while(!_freeWorkers.empty()){ + delete _freeWorkers.front(); + _freeWorkers.pop_front(); + } +} + +void ThreadPool::join(){ + while(_tasksRemaining){ + boostlock lock(_mutex); + _condition.wait(lock); + } +} + +void ThreadPool::schedule(Task task){ + boostlock lock(_mutex); + + _tasksRemaining++; + + if (!_freeWorkers.empty()){ + _freeWorkers.front()->set_task(task); + _freeWorkers.pop_front(); + }else{ + _tasks.push_back(task); + } +} + +// should only be called by a worker from the worker thread +void ThreadPool::task_done(Worker* worker){ + boostlock lock(_mutex); + + if (!_tasks.empty()){ + worker->set_task(_tasks.front()); + _tasks.pop_front(); + }else{ + _freeWorkers.push_back(worker); + } + + _tasksRemaining--; + + if(_tasksRemaining == 0) + _condition.notify_all(); +} + +} //namespace threadpool +} //namespace mongo |