diff options
author | Eliot Horowitz <eliot@10gen.com> | 2011-01-04 00:40:41 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2011-01-04 00:40:41 -0500 |
commit | b828d21630d8715fff5a30c682a51ab79880093d (patch) | |
tree | cbbc46069dcfc08ab1525ec06a5dff5967dde148 /util/concurrency | |
parent | 4315a900ae604e11f2d9d68d1e6f87b8aa01dddc (diff) | |
download | mongo-b828d21630d8715fff5a30c682a51ab79880093d.tar.gz |
ran astyle SERVER-2304
Diffstat (limited to 'util/concurrency')
-rw-r--r-- | util/concurrency/list.h | 96 | ||||
-rw-r--r-- | util/concurrency/msg.h | 6 | ||||
-rw-r--r-- | util/concurrency/mutex.h | 58 | ||||
-rw-r--r-- | util/concurrency/mvar.h | 28 | ||||
-rw-r--r-- | util/concurrency/rwlock.h | 86 | ||||
-rw-r--r-- | util/concurrency/spin_lock.cpp | 12 | ||||
-rw-r--r-- | util/concurrency/spin_lock.h | 2 | ||||
-rw-r--r-- | util/concurrency/synchronization.cpp | 8 | ||||
-rw-r--r-- | util/concurrency/synchronization.h | 6 | ||||
-rw-r--r-- | util/concurrency/task.cpp | 51 | ||||
-rw-r--r-- | util/concurrency/task.h | 10 | ||||
-rw-r--r-- | util/concurrency/thread_pool.cpp | 45 | ||||
-rw-r--r-- | util/concurrency/thread_pool.h | 108 | ||||
-rw-r--r-- | util/concurrency/value.h | 24 | ||||
-rw-r--r-- | util/concurrency/vars.cpp | 22 |
15 files changed, 284 insertions, 278 deletions
diff --git a/util/concurrency/list.h b/util/concurrency/list.h index 58b38ac63bd..e5eaec63bec 100644 --- a/util/concurrency/list.h +++ b/util/concurrency/list.h @@ -18,64 +18,64 @@ #pragma once -namespace mongo { +namespace mongo { -/* this class uses a mutex for writes, but not for reads. - we can get fancier later... + /* this class uses a mutex for writes, but not for reads. + we can get fancier later... - struct Member : public List1<Member>::Base { - const char *host; - int port; - }; - List1<Member> _members; - _members.head()->next(); + struct Member : public List1<Member>::Base { + const char *host; + int port; + }; + List1<Member> _members; + _members.head()->next(); -*/ -template<typename T> -class List1 : boost::noncopyable { -public: - /* next() and head() return 0 at end of list */ + */ + template<typename T> + class List1 : boost::noncopyable { + public: + /* next() and head() return 0 at end of list */ - List1() : _head(0), _m("List1"), _orphans(0) { } + List1() : _head(0), _m("List1"), _orphans(0) { } - class Base { - friend class List1; - T *_next; - public: - T* next() const { return _next; } - }; + class Base { + friend class List1; + T *_next; + public: + T* next() const { return _next; } + }; - T* head() const { return _head; } + T* head() const { return _head; } - void push(T* t) { - scoped_lock lk(_m); - t->_next = _head; - _head = t; - } + void push(T* t) { + scoped_lock lk(_m); + t->_next = _head; + _head = t; + } - // intentionally leak. - void orphanAll() { - _head = 0; - } + // intentionally leak. + void orphanAll() { + _head = 0; + } - /* t is not deleted, but is removed from the list. (orphaned) */ - void orphan(T* t) { - scoped_lock lk(_m); - T *&prev = _head; - T *n = prev; - while( n != t ) { - prev = n->_next; - n = prev; + /* t is not deleted, but is removed from the list. (orphaned) */ + void orphan(T* t) { + scoped_lock lk(_m); + T *&prev = _head; + T *n = prev; + while( n != t ) { + prev = n->_next; + n = prev; + } + prev = t->_next; + if( ++_orphans > 500 ) + log() << "warning orphans=" << _orphans << '\n'; } - prev = t->_next; - if( ++_orphans > 500 ) - log() << "warning orphans=" << _orphans << '\n'; - } -private: - T *_head; - mongo::mutex _m; - int _orphans; -}; + private: + T *_head; + mongo::mutex _m; + int _orphans; + }; }; diff --git a/util/concurrency/msg.h b/util/concurrency/msg.h index aa657dc053e..f7c6788dadc 100644 --- a/util/concurrency/msg.h +++ b/util/concurrency/msg.h @@ -21,14 +21,14 @@ #include <deque> #include "task.h" -namespace mongo { +namespace mongo { - namespace task { + namespace task { typedef boost::function<void()> lam; /** typical usage is: task::fork( new Server("threadname") ); */ - class Server : public Task { + class Server : public Task { public: /** send a message to the port */ void send(lam); diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h index 40205ddccd0..a8a84220e2b 100644 --- a/util/concurrency/mutex.h +++ b/util/concurrency/mutex.h @@ -22,11 +22,11 @@ #include "../heapcheck.h" -namespace mongo { +namespace mongo { class mutex; - inline boost::xtime incxtimemillis( long long s ){ + inline boost::xtime incxtimemillis( long long s ) { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += (int)( s / 1000 ); @@ -34,7 +34,7 @@ namespace mongo { if ( xt.nsec >= 1000000000 ) { xt.nsec -= 1000000000; xt.sec++; - } + } return xt; } @@ -42,7 +42,7 @@ namespace mongo { MutexDebugger checks that we always acquire locks for multiple mutexes in a consistant (acyclic) order. If we were inconsistent we could deadlock. */ - class MutexDebugger { + class MutexDebugger { typedef const char * mid; // mid = mutex ID typedef map<mid,int> Preceeding; map< mid, int > maxNest; @@ -55,12 +55,12 @@ namespace mongo { public: // set these to create an assert that // b must never be locked before a - // so + // so // a.lock(); b.lock(); is fine // b.lock(); alone is fine too // only checked on _DEBUG builds. string a,b; - + /** outputs some diagnostic info on mutexes (on _DEBUG builds) */ void programEnding(); @@ -75,7 +75,7 @@ namespace mongo { us.reset( _preceeding = new Preceeding() ); Preceeding &preceeding = *_preceeding; - if( a == m ) { + if( a == m ) { aBreakPoint(); if( preceeding[b.c_str()] ) { cout << "****** MutexDebugger error! warning " << b << " was locked before " << a << endl; @@ -84,7 +84,7 @@ namespace mongo { } preceeding[m]++; - if( preceeding[m] > 1 ) { + if( preceeding[m] > 1 ) { // recursive re-locking. if( preceeding[m] > maxNest[m] ) maxNest[m] = preceeding[m]; @@ -96,19 +96,19 @@ namespace mongo { { boost::mutex::scoped_lock lk(x); followers[m]; - for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { + for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { if( m != i->first && i->second > 0 ) { followers[i->first].insert(m); - if( followers[m].count(i->first) != 0 ){ + if( followers[m].count(i->first) != 0 ) { failed = true; stringstream ss; mid bad = i->first; ss << "mutex problem" << - "\n when locking " << m << - "\n " << bad << " was already locked and should not be." - "\n set a and b above to debug.\n"; + "\n when locking " << m << + "\n " << bad << " was already locked and should not be." + "\n set a and b above to debug.\n"; stringstream q; - for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { + for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { if( i->first != m && i->first != bad && i->second > 0 ) q << " " << i->first << '\n'; } @@ -126,7 +126,7 @@ namespace mongo { assert( 0 ); } } - void leaving(mid m) { + void leaving(mid m) { if( this == 0 ) return; // still in startup pre-main() Preceeding& preceeding = *us.get(); preceeding[m]--; @@ -137,7 +137,7 @@ namespace mongo { } }; extern MutexDebugger &mutexDebugger; - + // If you create a local static instance of this class, that instance will be destroyed // before all global static objects are destroyed, so _destroyingStatics will be set // to true before the global static variables are destroyed. @@ -157,13 +157,13 @@ namespace mongo { #endif #if defined(_DEBUG) - mutex(const char *name) - : _name(name) + mutex(const char *name) + : _name(name) #else - mutex(const char *) + mutex(const char *) #endif - { - _m = new boost::timed_mutex(); + { + _m = new boost::timed_mutex(); IGNORE_OBJECT( _m ); // Turn-off heap checking on _m } ~mutex() { @@ -172,22 +172,22 @@ namespace mongo { delete _m; } } - + class try_lock : boost::noncopyable { public: - try_lock( mongo::mutex &m , int millis = 0 ) - : _l( m.boost() , incxtimemillis( millis ) ) , + try_lock( mongo::mutex &m , int millis = 0 ) + : _l( m.boost() , incxtimemillis( millis ) ) , #if BOOST_VERSION >= 103500 - ok( _l.owns_lock() ) + ok( _l.owns_lock() ) #else ok( _l.locked() ) #endif { } - ~try_lock() { + ~try_lock() { } - + private: boost::timed_mutex::scoped_timed_lock _l; @@ -207,7 +207,7 @@ namespace mongo { mutexDebugger.entering(mut->_name); #endif } - ~scoped_lock() { + ~scoped_lock() { #if defined(_DEBUG) mutexDebugger.leaving(mut->_name); #endif @@ -223,7 +223,7 @@ namespace mongo { boost::timed_mutex &boost() { return *_m; } boost::timed_mutex *_m; }; - + typedef mutex::scoped_lock scoped_lock; typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock; diff --git a/util/concurrency/mvar.h b/util/concurrency/mvar.h index 7d17051368e..9c7a505b6d5 100644 --- a/util/concurrency/mvar.h +++ b/util/concurrency/mvar.h @@ -31,18 +31,18 @@ namespace mongo { // create an empty MVar MVar() - : _state(EMPTY) + : _state(EMPTY) {} // creates a full MVar MVar(const T& val) - : _state(FULL) - , _value(val) + : _state(FULL) + , _value(val) {} // puts val into the MVar and returns true or returns false if full // never blocks - bool tryPut(const T& val){ + bool tryPut(const T& val) { // intentionally repeat test before and after lock if (_state == FULL) return false; Mutex::scoped_lock lock(_mutex); @@ -59,17 +59,17 @@ namespace mongo { // puts val into the MVar // will block if the MVar is already full - void put(const T& val){ + void put(const T& val) { Mutex::scoped_lock lock(_mutex); - while (!tryPut(val)){ - // unlocks lock while waiting and relocks before returning + while (!tryPut(val)) { + // unlocks lock while waiting and relocks before returning _condition.wait(lock); - } + } } // takes val out of the MVar and returns true or returns false if empty // never blocks - bool tryTake(T& out){ + bool tryTake(T& out) { // intentionally repeat test before and after lock if (_state == EMPTY) return false; Mutex::scoped_lock lock(_mutex); @@ -86,14 +86,14 @@ namespace mongo { // takes val out of the MVar // will block if the MVar is empty - T take(){ + T take() { T ret = T(); Mutex::scoped_lock lock(_mutex); - while (!tryTake(ret)){ - // unlocks lock while waiting and relocks before returning + while (!tryTake(ret)) { + // unlocks lock while waiting and relocks before returning _condition.wait(lock); - } + } return ret; } @@ -102,7 +102,7 @@ namespace mongo { // Note: this is fast because there is no locking, but state could // change before you get a chance to act on it. // Mainly useful for sanity checks / asserts. - State getState(){ return _state; } + State getState() { return _state; } private: diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h index 2364b3a39b9..c9429c5b76d 100644 --- a/util/concurrency/rwlock.h +++ b/util/concurrency/rwlock.h @@ -22,14 +22,14 @@ #include "../time_support.h" #if BOOST_VERSION >= 103500 - #define BOOST_RWLOCK +#define BOOST_RWLOCK #else - #if defined(_WIN32) - #error need boost >= 1.35 for windows - #endif - - #include <pthread.h> +#if defined(_WIN32) +#error need boost >= 1.35 for windows +#endif + +#include <pthread.h> #endif @@ -51,40 +51,40 @@ namespace mongo { #else RWLock(const char *) { } #endif - void lock(){ + void lock() { _m.lock(); #if defined(_DEBUG) mutexDebugger.entering(_name); #endif } - void unlock(){ + void unlock() { #if defined(_DEBUG) mutexDebugger.leaving(_name); #endif _m.unlock(); } - - void lock_shared(){ + + void lock_shared() { _m.lock_shared(); } - - void unlock_shared(){ + + void unlock_shared() { _m.unlock_shared(); } - bool lock_shared_try( int millis ){ + bool lock_shared_try( int millis ) { boost::system_time until = get_system_time(); until += boost::posix_time::milliseconds(millis); - if( _m.timed_lock_shared( until ) ) { + if( _m.timed_lock_shared( until ) ) { return true; } return false; } - bool lock_try( int millis = 0 ){ + bool lock_try( int millis = 0 ) { boost::system_time until = get_system_time(); until += boost::posix_time::milliseconds(millis); - if( _m.timed_lock( until ) ) { + if( _m.timed_lock( until ) ) { #if defined(_DEBUG) mutexDebugger.entering(_name); #endif @@ -99,7 +99,7 @@ namespace mongo { class RWLock { pthread_rwlock_t _lock; - inline void check( int x ){ + inline void check( int x ) { if( x == 0 ) return; log() << "pthread rwlock failed: " << x << endl; @@ -115,40 +115,40 @@ namespace mongo { #endif check( pthread_rwlock_init( &_lock , 0 ) ); } - - ~RWLock(){ - if ( ! StaticObserver::_destroyingStatics ){ + + ~RWLock() { + if ( ! StaticObserver::_destroyingStatics ) { check( pthread_rwlock_destroy( &_lock ) ); } } - void lock(){ + void lock() { check( pthread_rwlock_wrlock( &_lock ) ); #if defined(_DEBUG) mutexDebugger.entering(_name); #endif } - void unlock(){ + void unlock() { #if defined(_DEBUG) mutexDebugger.leaving(_name); #endif check( pthread_rwlock_unlock( &_lock ) ); } - - void lock_shared(){ + + void lock_shared() { check( pthread_rwlock_rdlock( &_lock ) ); } - - void unlock_shared(){ + + void unlock_shared() { check( pthread_rwlock_unlock( &_lock ) ); } - - bool lock_shared_try( int millis ){ + + bool lock_shared_try( int millis ) { return _try( millis , false ); } - bool lock_try( int millis = 0 ){ - if( _try( millis , true ) ) { + bool lock_try( int millis = 0 ) { + if( _try( millis , true ) ) { #if defined(_DEBUG) mutexDebugger.entering(_name); #endif @@ -157,31 +157,31 @@ namespace mongo { return false; } - bool _try( int millis , bool write ){ + bool _try( int millis , bool write ) { while ( true ) { - int x = write ? - pthread_rwlock_trywrlock( &_lock ) : - pthread_rwlock_tryrdlock( &_lock ); - + int x = write ? + pthread_rwlock_trywrlock( &_lock ) : + pthread_rwlock_tryrdlock( &_lock ); + if ( x <= 0 ) { return true; } - + if ( millis-- <= 0 ) return false; - - if ( x == EBUSY ){ + + if ( x == EBUSY ) { sleepmillis(1); continue; } check(x); - } - + } + return false; } }; - + #endif @@ -190,7 +190,7 @@ namespace mongo { public: struct exception { }; rwlock_try_write(RWLock& l, int millis = 0) : _l(l) { - if( !l.lock_try(millis) ) + if( !l.lock_try(millis) ) throw exception(); } ~rwlock_try_write() { _l.unlock(); } @@ -216,7 +216,7 @@ namespace mongo { else _lock.unlock_shared(); } - private: + private: RWLock& _lock; const bool _write; }; diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp index 2e56acb47b3..0f33609d645 100644 --- a/util/concurrency/spin_lock.cpp +++ b/util/concurrency/spin_lock.cpp @@ -22,7 +22,7 @@ namespace mongo { - SpinLock::~SpinLock() { + SpinLock::~SpinLock() { #if defined(_WIN32) DeleteCriticalSection(&_cs); #endif @@ -30,14 +30,14 @@ namespace mongo { SpinLock::SpinLock() #if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - : _locked( false ) { } + : _locked( false ) { } #elif defined(_WIN32) { InitializeCriticalSectionAndSpinCount(&_cs, 4000); } #else : _mutex( "SpinLock" ) { } #endif - void SpinLock::lock(){ + void SpinLock::lock() { #if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) // fast path if (!_locked && !__sync_lock_test_and_set(&_locked, true)) { @@ -65,17 +65,17 @@ namespace mongo { #endif } - void SpinLock::unlock(){ + void SpinLock::unlock() { #if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) __sync_lock_release(&_locked); -#elif defined(WIN32) +#elif defined(WIN32) LeaveCriticalSection(&_cs); #else - + _mutex.unlock(); #endif diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h index 7324a6e4f32..d5360f7b3c6 100644 --- a/util/concurrency/spin_lock.h +++ b/util/concurrency/spin_lock.h @@ -48,7 +48,7 @@ namespace mongo { // Non-copyable, non-assignable SpinLock(SpinLock&); SpinLock& operator=(SpinLock&); - }; + }; } // namespace mongo diff --git a/util/concurrency/synchronization.cpp b/util/concurrency/synchronization.cpp index 5639c888a32..12e2894ef1a 100644 --- a/util/concurrency/synchronization.cpp +++ b/util/concurrency/synchronization.cpp @@ -22,15 +22,15 @@ namespace mongo { Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { } - Notification::~Notification(){ } + Notification::~Notification() { } - void Notification::waitToBeNotified(){ + void Notification::waitToBeNotified() { scoped_lock lock( _mutex ); while ( ! _notified ) _condition.wait( lock.boost() ); } - void Notification::notifyOne(){ + void Notification::notifyOne() { scoped_lock lock( _mutex ); assert( !_notified ); _notified = true; @@ -38,7 +38,7 @@ namespace mongo { } NotifyAll::NotifyAll() : _mutex("NotifyAll"), _counter(0) { } - + void NotifyAll::wait() { scoped_lock lock( _mutex ); unsigned long long old = _counter; diff --git a/util/concurrency/synchronization.h b/util/concurrency/synchronization.h index c2e70cabe19..ac2fcabcb86 100644 --- a/util/concurrency/synchronization.h +++ b/util/concurrency/synchronization.h @@ -52,12 +52,12 @@ namespace mongo { /** establishes a synchronization point between threads. N threads are waits and one is notifier. threadsafe. */ - class NotifyAll : boost::noncopyable { + class NotifyAll : boost::noncopyable { public: NotifyAll(); - /** awaits the next notifyAll() call by another thread. notifications that precede this - call are ignored -- we are looking for a fresh event. + /** awaits the next notifyAll() call by another thread. notifications that precede this + call are ignored -- we are looking for a fresh event. */ void wait(); diff --git a/util/concurrency/task.cpp b/util/concurrency/task.cpp index 20801f10217..d84cd71dceb 100644 --- a/util/concurrency/task.cpp +++ b/util/concurrency/task.cpp @@ -25,11 +25,11 @@ #include "../unittest.h" #include "../time_support.h" -namespace mongo { +namespace mongo { - namespace task { + namespace task { - /*void foo() { + /*void foo() { boost::mutex m; boost::mutex::scoped_lock lk(m); boost::condition cond; @@ -37,21 +37,21 @@ namespace mongo { cond.notify_one(); }*/ - Task::Task() - : BackgroundJob( true /* deleteSelf */ ) { + Task::Task() + : BackgroundJob( true /* deleteSelf */ ) { n = 0; repeat = 0; } void Task::halt() { repeat = 0; } - void Task::run() { + void Task::run() { assert( n == 0 ); while( 1 ) { n++; - try { + try { doWork(); - } + } catch(...) { } if( repeat == 0 ) break; @@ -65,11 +65,11 @@ namespace mongo { go(); } - void fork(Task *t) { + void fork(Task *t) { t->begin(); } - void repeat(Task *t, unsigned millis) { + void repeat(Task *t, unsigned millis) { t->repeat = millis; t->begin(); } @@ -110,7 +110,7 @@ namespace mongo { } } - void Server::send( lam msg ) { + void Server::send( lam msg ) { { boost::mutex::scoped_lock lk(m); d.push_back(msg); @@ -118,9 +118,9 @@ namespace mongo { c.notify_one(); } - void Server::doWork() { + void Server::doWork() { starting(); - while( 1 ) { + while( 1 ) { lam f; try { boost::mutex::scoped_lock lk(m); @@ -129,7 +129,7 @@ namespace mongo { f = d.front(); d.pop_front(); } - catch(...) { + catch(...) { log() << "ERROR exception in Server:doWork?" << endl; } try { @@ -141,27 +141,28 @@ namespace mongo { d.push_back(f); } } - } catch(std::exception& e) { - log() << "Server::doWork task:" << name() << " exception:" << e.what() << endl; - } - catch(const char *p) { - log() << "Server::doWork task:" << name() << " unknown c exception:" << - ((p&&strlen(p)<800)?p:"?") << endl; - } - catch(...) { - log() << "Server::doWork unknown exception task:" << name() << endl; + } + catch(std::exception& e) { + log() << "Server::doWork task:" << name() << " exception:" << e.what() << endl; + } + catch(const char *p) { + log() << "Server::doWork task:" << name() << " unknown c exception:" << + ((p&&strlen(p)<800)?p:"?") << endl; + } + catch(...) { + log() << "Server::doWork unknown exception task:" << name() << endl; } } } static Server *s; - static void abc(int i) { + static void abc(int i) { cout << "Hello " << i << endl; s->requeue(); } class TaskUnitTest : public mongo::UnitTest { public: - virtual void run() { + virtual void run() { lam f = boost::bind(abc, 3); //f(); diff --git a/util/concurrency/task.h b/util/concurrency/task.h index 654ecd35fd2..d7b45eeef24 100644 --- a/util/concurrency/task.h +++ b/util/concurrency/task.h @@ -20,9 +20,9 @@ #include "../background.h" -namespace mongo { +namespace mongo { - namespace task { + namespace task { /** abstraction around threads. simpler than BackgroundJob which is used behind the scenes. allocate the Task dynamically. when the thread terminates, the Task object will delete itself. @@ -34,7 +34,7 @@ namespace mongo { public: Task(); - /** for a repeating task, stop after current invocation ends. can be called by other threads + /** for a repeating task, stop after current invocation ends. can be called by other threads as long as the Task is still in scope. */ void halt(); @@ -54,8 +54,8 @@ namespace mongo { void repeat(Task *t, unsigned millis); /*** Example *** - inline void sample() { - class Sample : public Task { + inline void sample() { + class Sample : public Task { public: int result; virtual void doWork() { result = 1234; } diff --git a/util/concurrency/thread_pool.cpp b/util/concurrency/thread_pool.cpp index 2caac1ff3f3..1c258847cb5 100644 --- a/util/concurrency/thread_pool.cpp +++ b/util/concurrency/thread_pool.cpp @@ -20,8 +20,8 @@ #include "thread_pool.h" #include "mvar.h" -namespace mongo{ - namespace threadpool{ +namespace mongo { + namespace threadpool { // Worker thread class Worker : boost::noncopyable { @@ -34,12 +34,12 @@ namespace mongo{ // destructor will block until current operation is completed // Acts as a "join" on this thread - ~Worker(){ + ~Worker() { _task.put(Task()); _thread.join(); } - void set_task(Task& func){ + void set_task(Task& func) { assert(!func.empty()); assert(_is_done); _is_done = false; @@ -47,13 +47,13 @@ namespace mongo{ _task.put(func); } - private: + private: ThreadPool& _owner; MVar<Task> _task; bool _is_done; // only used for error detection boost::thread _thread; - void loop(){ + void loop() { while (true) { Task task = _task.take(); if (task.empty()) @@ -61,9 +61,11 @@ namespace mongo{ try { task(); - } catch (std::exception e){ + } + catch (std::exception e) { log() << "Unhandled exception in worker thread: " << e.what() << endl;; - } catch (...){ + } + catch (...) { log() << "Unhandled non-exception in worker thread" << endl; } _is_done = true; @@ -74,16 +76,15 @@ namespace mongo{ ThreadPool::ThreadPool(int nThreads) : _mutex("ThreadPool"), _tasksRemaining(0) - , _nThreads(nThreads) - { + , _nThreads(nThreads) { scoped_lock lock(_mutex); - while (nThreads-- > 0){ + while (nThreads-- > 0) { Worker* worker = new Worker(*this); _freeWorkers.push_front(worker); } } - ThreadPool::~ThreadPool(){ + ThreadPool::~ThreadPool() { join(); assert(_tasks.empty()); @@ -91,40 +92,42 @@ namespace mongo{ // O(n) but n should be small assert(_freeWorkers.size() == (unsigned)_nThreads); - while(!_freeWorkers.empty()){ + while(!_freeWorkers.empty()) { delete _freeWorkers.front(); _freeWorkers.pop_front(); } } - void ThreadPool::join(){ + void ThreadPool::join() { scoped_lock lock(_mutex); - while(_tasksRemaining){ + while(_tasksRemaining) { _condition.wait(lock.boost()); } } - void ThreadPool::schedule(Task task){ + void ThreadPool::schedule(Task task) { scoped_lock lock(_mutex); _tasksRemaining++; - if (!_freeWorkers.empty()){ + if (!_freeWorkers.empty()) { _freeWorkers.front()->set_task(task); _freeWorkers.pop_front(); - }else{ + } + else { _tasks.push_back(task); } } // should only be called by a worker from the worker thread - void ThreadPool::task_done(Worker* worker){ + void ThreadPool::task_done(Worker* worker) { scoped_lock lock(_mutex); - if (!_tasks.empty()){ + if (!_tasks.empty()) { worker->set_task(_tasks.front()); _tasks.pop_front(); - }else{ + } + else { _freeWorkers.push_front(worker); } diff --git a/util/concurrency/thread_pool.h b/util/concurrency/thread_pool.h index 31e06430088..b348ed1d01b 100644 --- a/util/concurrency/thread_pool.h +++ b/util/concurrency/thread_pool.h @@ -24,59 +24,59 @@ namespace mongo { -namespace threadpool { - class Worker; - - typedef boost::function<void(void)> Task; //nullary function or functor - - // exported to the mongo namespace - class ThreadPool : boost::noncopyable{ - public: - explicit ThreadPool(int nThreads=8); - - // blocks until all tasks are complete (tasks_remaining() == 0) - // You should not call schedule while in the destructor - ~ThreadPool(); - - // blocks until all tasks are complete (tasks_remaining() == 0) - // does not prevent new tasks from being scheduled so could wait forever. - // Also, new tasks could be scheduled after this returns. - void join(); - - // task will be copied a few times so make sure it's relatively cheap - void schedule(Task task); - - // Helpers that wrap schedule and boost::bind. - // Functor and args will be copied a few times so make sure it's relatively cheap - template<typename F, typename A> - void schedule(F f, A a){ schedule(boost::bind(f,a)); } - template<typename F, typename A, typename B> - void schedule(F f, A a, B b){ schedule(boost::bind(f,a,b)); } - template<typename F, typename A, typename B, typename C> - void schedule(F f, A a, B b, C c){ schedule(boost::bind(f,a,b,c)); } - template<typename F, typename A, typename B, typename C, typename D> - void schedule(F f, A a, B b, C c, D d){ schedule(boost::bind(f,a,b,c,d)); } - template<typename F, typename A, typename B, typename C, typename D, typename E> - void schedule(F f, A a, B b, C c, D d, E e){ schedule(boost::bind(f,a,b,c,d,e)); } - - int tasks_remaining() { return _tasksRemaining; } - - private: - mongo::mutex _mutex; - boost::condition _condition; - - list<Worker*> _freeWorkers; //used as LIFO stack (always front) - list<Task> _tasks; //used as FIFO queue (push_back, pop_front) - int _tasksRemaining; // in queue + currently processing - int _nThreads; // only used for sanity checking. could be removed in the future. - - // should only be called by a worker from the worker's thread - void task_done(Worker* worker); - friend class Worker; - }; - -} //namespace threadpool - -using threadpool::ThreadPool; + namespace threadpool { + class Worker; + + typedef boost::function<void(void)> Task; //nullary function or functor + + // exported to the mongo namespace + class ThreadPool : boost::noncopyable { + public: + explicit ThreadPool(int nThreads=8); + + // blocks until all tasks are complete (tasks_remaining() == 0) + // You should not call schedule while in the destructor + ~ThreadPool(); + + // blocks until all tasks are complete (tasks_remaining() == 0) + // does not prevent new tasks from being scheduled so could wait forever. + // Also, new tasks could be scheduled after this returns. + void join(); + + // task will be copied a few times so make sure it's relatively cheap + void schedule(Task task); + + // Helpers that wrap schedule and boost::bind. + // Functor and args will be copied a few times so make sure it's relatively cheap + template<typename F, typename A> + void schedule(F f, A a) { schedule(boost::bind(f,a)); } + template<typename F, typename A, typename B> + void schedule(F f, A a, B b) { schedule(boost::bind(f,a,b)); } + template<typename F, typename A, typename B, typename C> + void schedule(F f, A a, B b, C c) { schedule(boost::bind(f,a,b,c)); } + template<typename F, typename A, typename B, typename C, typename D> + void schedule(F f, A a, B b, C c, D d) { schedule(boost::bind(f,a,b,c,d)); } + template<typename F, typename A, typename B, typename C, typename D, typename E> + void schedule(F f, A a, B b, C c, D d, E e) { schedule(boost::bind(f,a,b,c,d,e)); } + + int tasks_remaining() { return _tasksRemaining; } + + private: + mongo::mutex _mutex; + boost::condition _condition; + + list<Worker*> _freeWorkers; //used as LIFO stack (always front) + list<Task> _tasks; //used as FIFO queue (push_back, pop_front) + int _tasksRemaining; // in queue + currently processing + int _nThreads; // only used for sanity checking. could be removed in the future. + + // should only be called by a worker from the worker's thread + void task_done(Worker* worker); + friend class Worker; + }; + + } //namespace threadpool + + using threadpool::ThreadPool; } //namespace mongo diff --git a/util/concurrency/value.h b/util/concurrency/value.h index dabeb956e43..08d53062bf6 100644 --- a/util/concurrency/value.h +++ b/util/concurrency/value.h @@ -20,11 +20,11 @@ #pragma once -namespace mongo { +namespace mongo { extern mutex _atomicMutex; - /** atomic wrapper for a value. enters a mutex on each access. must + /** atomic wrapper for a value. enters a mutex on each access. must be copyable. */ template<typename T> @@ -33,20 +33,22 @@ namespace mongo { public: Atomic<T>() { } - void operator=(const T& a) { + void operator=(const T& a) { scoped_lock lk(_atomicMutex); - val = a; } + val = a; + } - operator T() const { + operator T() const { scoped_lock lk(_atomicMutex); - return val; } - + return val; + } + /** example: Atomic<int> q; ... { Atomic<int>::tran t(q); - if( q.ref() > 0 ) + if( q.ref() > 0 ) q.ref()--; } */ @@ -58,11 +60,11 @@ namespace mongo { }; }; - /** this string COULD be mangled but with the double buffering, assuming writes - are infrequent, it's unlikely. thus, this is reasonable for lockless setting of + /** this string COULD be mangled but with the double buffering, assuming writes + are infrequent, it's unlikely. thus, this is reasonable for lockless setting of diagnostic strings, where their content isn't critical. */ - class DiagStr { + class DiagStr { char buf1[256]; char buf2[256]; char *p; diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp index 0bf52ec048e..3d057a4801e 100644 --- a/util/concurrency/vars.cpp +++ b/util/concurrency/vars.cpp @@ -20,28 +20,28 @@ #include "value.h" #include "mutex.h" -namespace mongo { +namespace mongo { mongo::mutex _atomicMutex("_atomicMutex"); // intentional leak. otherwise destructor orders can be problematic at termination. MutexDebugger &mutexDebugger = *(new MutexDebugger()); - MutexDebugger::MutexDebugger() : - x( *(new boost::mutex()) ), magic(0x12345678) { - // optional way to debug lock order - /* - a = "a_lock"; - b = "b_lock"; - */ + MutexDebugger::MutexDebugger() : + x( *(new boost::mutex()) ), magic(0x12345678) { + // optional way to debug lock order + /* + a = "a_lock"; + b = "b_lock"; + */ } - void MutexDebugger::programEnding() { + void MutexDebugger::programEnding() { if( logLevel>=1 && followers.size() ) { std::cout << followers.size() << " mutexes in program" << endl; - for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) { + for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) { cout << i->first; - if( maxNest[i->first] > 1 ) + if( maxNest[i->first] > 1 ) cout << " maxNest:" << maxNest[i->first]; cout << '\n'; for( set<mid>::iterator j = i->second.begin(); j != i->second.end(); j++ ) |