diff options
author | Mathias Stearn <mathias@10gen.com> | 2009-12-22 12:13:40 -0500 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2009-12-22 12:15:52 -0500 |
commit | a871f11644754ecc283ea601067f1cf9b4ef33ea (patch) | |
tree | edb473c3b981907b612e5360e231918fc2e3c62c | |
parent | 507622d302838c6c5e54f4863f14241e8190ad23 (diff) | |
download | mongo-a871f11644754ecc283ea601067f1cf9b4ef33ea.tar.gz |
Thread pool
-rw-r--r-- | SConstruct | 2 | ||||
-rw-r--r-- | dbtests/threadedtests.cpp | 28 | ||||
-rw-r--r-- | util/thread_pool.cpp | 138 | ||||
-rw-r--r-- | util/thread_pool.h | 83 |
4 files changed, 250 insertions, 1 deletions
diff --git a/SConstruct b/SConstruct index 7c81ae70a0b..65093d4ab87 100644 --- a/SConstruct +++ b/SConstruct @@ -329,7 +329,7 @@ else: coreDbFiles = [] coreServerFiles = [ "util/message_server_port.cpp" , "util/message_server_asio.cpp" ] -serverOnlyFiles = Split( "db/query.cpp db/update.cpp db/introspect.cpp db/btree.cpp db/clientcursor.cpp db/tests.cpp db/repl.cpp db/btreecursor.cpp db/cloner.cpp db/namespace.cpp db/matcher.cpp db/dbeval.cpp db/dbwebserver.cpp db/dbinfo.cpp db/dbhelpers.cpp db/instance.cpp db/pdfile.cpp db/cursor.cpp db/security_commands.cpp db/client.cpp db/security.cpp util/miniwebserver.cpp db/storage.cpp db/reccache.cpp db/queryoptimizer.cpp db/extsort.cpp db/mr.cpp s/d_util.cpp" ) +serverOnlyFiles = Split( "db/query.cpp db/update.cpp db/introspect.cpp db/btree.cpp db/clientcursor.cpp db/tests.cpp db/repl.cpp db/btreecursor.cpp db/cloner.cpp db/namespace.cpp db/matcher.cpp db/dbeval.cpp db/dbwebserver.cpp db/dbinfo.cpp db/dbhelpers.cpp db/instance.cpp db/pdfile.cpp db/cursor.cpp db/security_commands.cpp db/client.cpp db/security.cpp util/miniwebserver.cpp db/storage.cpp db/reccache.cpp db/queryoptimizer.cpp db/extsort.cpp db/mr.cpp s/d_util.cpp util/thread_pool.cpp" ) serverOnlyFiles += Glob( "db/dbcommands*.cpp" ) if usesm: diff --git a/dbtests/threadedtests.cpp b/dbtests/threadedtests.cpp index 0e6d9a4bfdf..f3ebe3999f0 100644 --- a/dbtests/threadedtests.cpp +++ b/dbtests/threadedtests.cpp @@ -19,6 +19,7 @@ #include "stdafx.h" #include "../util/mvar.h" +#include "../util/thread_pool.h" #include <boost/thread.hpp> #include <boost/bind.hpp> @@ -94,6 +95,32 @@ namespace ThreadedTests { } }; + class ThreadPoolTest{ + static const int iterations = 10000; + static const int nThreads = 8; + + WrappingInt counter; + void increment(int n){ + for (int i=0; i<n; i++){ + counter.atomicIncrement(); + } + } + + public: + void run(){ + ThreadPool tp(nThreads); + + for (int i=0; i < iterations; i++){ + tp.schedule(&WrappingInt::atomicIncrement, &counter); + tp.schedule(&ThreadPoolTest::increment, this, 2); + } + + tp.join(); + + ASSERT(counter == (unsigned)(iterations * 3)); + } + }; + class All : public Suite { public: All() : Suite( "threading" ){ @@ -102,6 +129,7 @@ namespace ThreadedTests { void setupTests(){ add< IsWrappingIntAtomic >(); add< MVarTest >(); + add< ThreadPoolTest >(); } } myall; } diff --git a/util/thread_pool.cpp b/util/thread_pool.cpp new file mode 100644 index 00000000000..64720b9761e --- /dev/null +++ b/util/thread_pool.cpp @@ -0,0 +1,138 @@ +/* threadpool.cpp +*/ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../stdafx.h" +#include "thread_pool.h" +#include "util/mvar.h" + + +namespace mongo{ +namespace threadpool{ + +// Worker thread +struct Worker : boost::noncopyable{ + explicit Worker(ThreadPool& owner) + : _owner(owner) + , _is_done(true) + , _thread(boost::bind(&Worker::loop, this)) + {} + + // destructor will block until current operation is completed + // Acts as a "join" on this thread + ~Worker(){ + _task.put(Task()); + _thread.join(); + } + + void set_task(Task& func){ + assert(!func.empty()); + assert(_is_done); + _is_done = false; + + _task.put(func); + } + + private: + ThreadPool& _owner; + MVar<Task> _task; + bool _is_done; // only used for error detection + boost::thread _thread; + + void loop(){ + while (true) { + Task task = _task.take(); + if (task.empty()) + break; // ends the thread + + try { + task(); + } catch (std::exception e){ + log() << "Unhandled exception in worker thread: " << e.what() << endl;; + } catch (...){ + log() << "Unhandled non-exception in worker thread" << endl; + } + _is_done = true; + _owner.task_done(this); + } + } +}; + +ThreadPool::ThreadPool(int nThreads) + : _tasksRemaining(0) + , _nThreads(nThreads) +{ + boostlock lock(_mutex); + while (nThreads-- > 0){ + Worker* worker = new Worker(*this); + _freeWorkers.push_front(worker); + } +} + +ThreadPool::~ThreadPool(){ + join(); + + assert(_tasks.empty()); + + // O(n) but n should be small + assert(_freeWorkers.size() == (unsigned)_nThreads); + + while(!_freeWorkers.empty()){ + delete _freeWorkers.front(); + _freeWorkers.pop_front(); + } +} + +void ThreadPool::join(){ + while(_tasksRemaining){ + boostlock lock(_mutex); + _condition.wait(lock); + } +} + +void ThreadPool::schedule(Task task){ + boostlock lock(_mutex); + + _tasksRemaining++; + + if (!_freeWorkers.empty()){ + _freeWorkers.front()->set_task(task); + _freeWorkers.pop_front(); + }else{ + _tasks.push_back(task); + } +} + +// should only be called by a worker from the worker thread +void ThreadPool::task_done(Worker* worker){ + boostlock lock(_mutex); + + if (!_tasks.empty()){ + worker->set_task(_tasks.front()); + _tasks.pop_front(); + }else{ + _freeWorkers.push_back(worker); + } + + _tasksRemaining--; + + if(_tasksRemaining == 0) + _condition.notify_all(); +} + +} //namespace threadpool +} //namespace mongo diff --git a/util/thread_pool.h b/util/thread_pool.h new file mode 100644 index 00000000000..a6e511faa65 --- /dev/null +++ b/util/thread_pool.h @@ -0,0 +1,83 @@ +// thread_pool.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <boost/function.hpp> +#include <boost/bind.hpp> +#include <boost/ptr_container/ptr_list.hpp> +#undef assert +#define assert xassert + +namespace mongo { + +namespace threadpool { + class Worker; + + typedef boost::function<void(void)> Task; //nullary function or functor + + // exported to the mongo namespace + class ThreadPool : boost::noncopyable{ + public: + explicit ThreadPool(int nThreads=8); + + // blocks until all tasks are complete (tasks_remaining() == 0) + // You should not call schedule while in the destructor + ~ThreadPool(); + + // blocks until all tasks are complete (tasks_remaining() == 0) + // does not prevent new tasks from being scheduled so could wait forever. + // Also, new tasks could be scheduled after this returns. + void join(); + + + // task will be copied a few times so make sure it's relatively cheap + void schedule(Task task); + + // Helpers that wrap schedule and boost::bind. + // Functor and args will be copied a few times so make sure it's relatively cheap + template<typename F, typename A> + void schedule(F f, A a){ schedule(boost::bind(f,a)); } + template<typename F, typename A, typename B> + void schedule(F f, A a, B b){ schedule(boost::bind(f,a,b)); } + template<typename F, typename A, typename B, typename C> + void schedule(F f, A a, B b, C c){ schedule(boost::bind(f,a,b,c)); } + template<typename F, typename A, typename B, typename C, typename D> + void schedule(F f, A a, B b, C c, D d){ schedule(boost::bind(f,a,b,c,d)); } + template<typename F, typename A, typename B, typename C, typename D, typename E> + void schedule(F f, A a, B b, C c, D d, E e){ schedule(boost::bind(f,a,b,c,d,e)); } + + + int tasks_remaining() { return _tasksRemaining; } + + private: + boost::mutex _mutex; + boost::condition _condition; + + list<Worker*> _freeWorkers; //used as LIFO stack (always front) + list<Task> _tasks; //used as FIFO queue (push_back, pop_front) + int _tasksRemaining; // in queue + currently processing + int _nThreads; // only used for sanity checking. could be removed in the future. + + // should only be called by a worker from the worker's thread + void task_done(Worker* worker); + friend class Worker; + }; + +} //namespace threadpool + +using threadpool::ThreadPool; + +} //namespace mongo |