summaryrefslogtreecommitdiff
path: root/util/concurrency
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2011-01-04 00:40:41 -0500
committerEliot Horowitz <eliot@10gen.com>2011-01-04 00:40:41 -0500
commitb828d21630d8715fff5a30c682a51ab79880093d (patch)
treecbbc46069dcfc08ab1525ec06a5dff5967dde148 /util/concurrency
parent4315a900ae604e11f2d9d68d1e6f87b8aa01dddc (diff)
downloadmongo-b828d21630d8715fff5a30c682a51ab79880093d.tar.gz
ran astyle SERVER-2304
Diffstat (limited to 'util/concurrency')
-rw-r--r--util/concurrency/list.h96
-rw-r--r--util/concurrency/msg.h6
-rw-r--r--util/concurrency/mutex.h58
-rw-r--r--util/concurrency/mvar.h28
-rw-r--r--util/concurrency/rwlock.h86
-rw-r--r--util/concurrency/spin_lock.cpp12
-rw-r--r--util/concurrency/spin_lock.h2
-rw-r--r--util/concurrency/synchronization.cpp8
-rw-r--r--util/concurrency/synchronization.h6
-rw-r--r--util/concurrency/task.cpp51
-rw-r--r--util/concurrency/task.h10
-rw-r--r--util/concurrency/thread_pool.cpp45
-rw-r--r--util/concurrency/thread_pool.h108
-rw-r--r--util/concurrency/value.h24
-rw-r--r--util/concurrency/vars.cpp22
15 files changed, 284 insertions, 278 deletions
diff --git a/util/concurrency/list.h b/util/concurrency/list.h
index 58b38ac63bd..e5eaec63bec 100644
--- a/util/concurrency/list.h
+++ b/util/concurrency/list.h
@@ -18,64 +18,64 @@
#pragma once
-namespace mongo {
+namespace mongo {
-/* this class uses a mutex for writes, but not for reads.
- we can get fancier later...
+ /* this class uses a mutex for writes, but not for reads.
+ we can get fancier later...
- struct Member : public List1<Member>::Base {
- const char *host;
- int port;
- };
- List1<Member> _members;
- _members.head()->next();
+ struct Member : public List1<Member>::Base {
+ const char *host;
+ int port;
+ };
+ List1<Member> _members;
+ _members.head()->next();
-*/
-template<typename T>
-class List1 : boost::noncopyable {
-public:
- /* next() and head() return 0 at end of list */
+ */
+ template<typename T>
+ class List1 : boost::noncopyable {
+ public:
+ /* next() and head() return 0 at end of list */
- List1() : _head(0), _m("List1"), _orphans(0) { }
+ List1() : _head(0), _m("List1"), _orphans(0) { }
- class Base {
- friend class List1;
- T *_next;
- public:
- T* next() const { return _next; }
- };
+ class Base {
+ friend class List1;
+ T *_next;
+ public:
+ T* next() const { return _next; }
+ };
- T* head() const { return _head; }
+ T* head() const { return _head; }
- void push(T* t) {
- scoped_lock lk(_m);
- t->_next = _head;
- _head = t;
- }
+ void push(T* t) {
+ scoped_lock lk(_m);
+ t->_next = _head;
+ _head = t;
+ }
- // intentionally leak.
- void orphanAll() {
- _head = 0;
- }
+ // intentionally leak.
+ void orphanAll() {
+ _head = 0;
+ }
- /* t is not deleted, but is removed from the list. (orphaned) */
- void orphan(T* t) {
- scoped_lock lk(_m);
- T *&prev = _head;
- T *n = prev;
- while( n != t ) {
- prev = n->_next;
- n = prev;
+ /* t is not deleted, but is removed from the list. (orphaned) */
+ void orphan(T* t) {
+ scoped_lock lk(_m);
+ T *&prev = _head;
+ T *n = prev;
+ while( n != t ) {
+ prev = n->_next;
+ n = prev;
+ }
+ prev = t->_next;
+ if( ++_orphans > 500 )
+ log() << "warning orphans=" << _orphans << '\n';
}
- prev = t->_next;
- if( ++_orphans > 500 )
- log() << "warning orphans=" << _orphans << '\n';
- }
-private:
- T *_head;
- mongo::mutex _m;
- int _orphans;
-};
+ private:
+ T *_head;
+ mongo::mutex _m;
+ int _orphans;
+ };
};
diff --git a/util/concurrency/msg.h b/util/concurrency/msg.h
index aa657dc053e..f7c6788dadc 100644
--- a/util/concurrency/msg.h
+++ b/util/concurrency/msg.h
@@ -21,14 +21,14 @@
#include <deque>
#include "task.h"
-namespace mongo {
+namespace mongo {
- namespace task {
+ namespace task {
typedef boost::function<void()> lam;
/** typical usage is: task::fork( new Server("threadname") ); */
- class Server : public Task {
+ class Server : public Task {
public:
/** send a message to the port */
void send(lam);
diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h
index 40205ddccd0..a8a84220e2b 100644
--- a/util/concurrency/mutex.h
+++ b/util/concurrency/mutex.h
@@ -22,11 +22,11 @@
#include "../heapcheck.h"
-namespace mongo {
+namespace mongo {
class mutex;
- inline boost::xtime incxtimemillis( long long s ){
+ inline boost::xtime incxtimemillis( long long s ) {
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
xt.sec += (int)( s / 1000 );
@@ -34,7 +34,7 @@ namespace mongo {
if ( xt.nsec >= 1000000000 ) {
xt.nsec -= 1000000000;
xt.sec++;
- }
+ }
return xt;
}
@@ -42,7 +42,7 @@ namespace mongo {
MutexDebugger checks that we always acquire locks for multiple mutexes in a consistant (acyclic) order.
If we were inconsistent we could deadlock.
*/
- class MutexDebugger {
+ class MutexDebugger {
typedef const char * mid; // mid = mutex ID
typedef map<mid,int> Preceeding;
map< mid, int > maxNest;
@@ -55,12 +55,12 @@ namespace mongo {
public:
// set these to create an assert that
// b must never be locked before a
- // so
+ // so
// a.lock(); b.lock(); is fine
// b.lock(); alone is fine too
// only checked on _DEBUG builds.
string a,b;
-
+
/** outputs some diagnostic info on mutexes (on _DEBUG builds) */
void programEnding();
@@ -75,7 +75,7 @@ namespace mongo {
us.reset( _preceeding = new Preceeding() );
Preceeding &preceeding = *_preceeding;
- if( a == m ) {
+ if( a == m ) {
aBreakPoint();
if( preceeding[b.c_str()] ) {
cout << "****** MutexDebugger error! warning " << b << " was locked before " << a << endl;
@@ -84,7 +84,7 @@ namespace mongo {
}
preceeding[m]++;
- if( preceeding[m] > 1 ) {
+ if( preceeding[m] > 1 ) {
// recursive re-locking.
if( preceeding[m] > maxNest[m] )
maxNest[m] = preceeding[m];
@@ -96,19 +96,19 @@ namespace mongo {
{
boost::mutex::scoped_lock lk(x);
followers[m];
- for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
+ for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
if( m != i->first && i->second > 0 ) {
followers[i->first].insert(m);
- if( followers[m].count(i->first) != 0 ){
+ if( followers[m].count(i->first) != 0 ) {
failed = true;
stringstream ss;
mid bad = i->first;
ss << "mutex problem" <<
- "\n when locking " << m <<
- "\n " << bad << " was already locked and should not be."
- "\n set a and b above to debug.\n";
+ "\n when locking " << m <<
+ "\n " << bad << " was already locked and should not be."
+ "\n set a and b above to debug.\n";
stringstream q;
- for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
+ for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
if( i->first != m && i->first != bad && i->second > 0 )
q << " " << i->first << '\n';
}
@@ -126,7 +126,7 @@ namespace mongo {
assert( 0 );
}
}
- void leaving(mid m) {
+ void leaving(mid m) {
if( this == 0 ) return; // still in startup pre-main()
Preceeding& preceeding = *us.get();
preceeding[m]--;
@@ -137,7 +137,7 @@ namespace mongo {
}
};
extern MutexDebugger &mutexDebugger;
-
+
// If you create a local static instance of this class, that instance will be destroyed
// before all global static objects are destroyed, so _destroyingStatics will be set
// to true before the global static variables are destroyed.
@@ -157,13 +157,13 @@ namespace mongo {
#endif
#if defined(_DEBUG)
- mutex(const char *name)
- : _name(name)
+ mutex(const char *name)
+ : _name(name)
#else
- mutex(const char *)
+ mutex(const char *)
#endif
- {
- _m = new boost::timed_mutex();
+ {
+ _m = new boost::timed_mutex();
IGNORE_OBJECT( _m ); // Turn-off heap checking on _m
}
~mutex() {
@@ -172,22 +172,22 @@ namespace mongo {
delete _m;
}
}
-
+
class try_lock : boost::noncopyable {
public:
- try_lock( mongo::mutex &m , int millis = 0 )
- : _l( m.boost() , incxtimemillis( millis ) ) ,
+ try_lock( mongo::mutex &m , int millis = 0 )
+ : _l( m.boost() , incxtimemillis( millis ) ) ,
#if BOOST_VERSION >= 103500
- ok( _l.owns_lock() )
+ ok( _l.owns_lock() )
#else
ok( _l.locked() )
#endif
{
}
- ~try_lock() {
+ ~try_lock() {
}
-
+
private:
boost::timed_mutex::scoped_timed_lock _l;
@@ -207,7 +207,7 @@ namespace mongo {
mutexDebugger.entering(mut->_name);
#endif
}
- ~scoped_lock() {
+ ~scoped_lock() {
#if defined(_DEBUG)
mutexDebugger.leaving(mut->_name);
#endif
@@ -223,7 +223,7 @@ namespace mongo {
boost::timed_mutex &boost() { return *_m; }
boost::timed_mutex *_m;
};
-
+
typedef mutex::scoped_lock scoped_lock;
typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock;
diff --git a/util/concurrency/mvar.h b/util/concurrency/mvar.h
index 7d17051368e..9c7a505b6d5 100644
--- a/util/concurrency/mvar.h
+++ b/util/concurrency/mvar.h
@@ -31,18 +31,18 @@ namespace mongo {
// create an empty MVar
MVar()
- : _state(EMPTY)
+ : _state(EMPTY)
{}
// creates a full MVar
MVar(const T& val)
- : _state(FULL)
- , _value(val)
+ : _state(FULL)
+ , _value(val)
{}
// puts val into the MVar and returns true or returns false if full
// never blocks
- bool tryPut(const T& val){
+ bool tryPut(const T& val) {
// intentionally repeat test before and after lock
if (_state == FULL) return false;
Mutex::scoped_lock lock(_mutex);
@@ -59,17 +59,17 @@ namespace mongo {
// puts val into the MVar
// will block if the MVar is already full
- void put(const T& val){
+ void put(const T& val) {
Mutex::scoped_lock lock(_mutex);
- while (!tryPut(val)){
- // unlocks lock while waiting and relocks before returning
+ while (!tryPut(val)) {
+ // unlocks lock while waiting and relocks before returning
_condition.wait(lock);
- }
+ }
}
// takes val out of the MVar and returns true or returns false if empty
// never blocks
- bool tryTake(T& out){
+ bool tryTake(T& out) {
// intentionally repeat test before and after lock
if (_state == EMPTY) return false;
Mutex::scoped_lock lock(_mutex);
@@ -86,14 +86,14 @@ namespace mongo {
// takes val out of the MVar
// will block if the MVar is empty
- T take(){
+ T take() {
T ret = T();
Mutex::scoped_lock lock(_mutex);
- while (!tryTake(ret)){
- // unlocks lock while waiting and relocks before returning
+ while (!tryTake(ret)) {
+ // unlocks lock while waiting and relocks before returning
_condition.wait(lock);
- }
+ }
return ret;
}
@@ -102,7 +102,7 @@ namespace mongo {
// Note: this is fast because there is no locking, but state could
// change before you get a chance to act on it.
// Mainly useful for sanity checks / asserts.
- State getState(){ return _state; }
+ State getState() { return _state; }
private:
diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h
index 2364b3a39b9..c9429c5b76d 100644
--- a/util/concurrency/rwlock.h
+++ b/util/concurrency/rwlock.h
@@ -22,14 +22,14 @@
#include "../time_support.h"
#if BOOST_VERSION >= 103500
- #define BOOST_RWLOCK
+#define BOOST_RWLOCK
#else
- #if defined(_WIN32)
- #error need boost >= 1.35 for windows
- #endif
-
- #include <pthread.h>
+#if defined(_WIN32)
+#error need boost >= 1.35 for windows
+#endif
+
+#include <pthread.h>
#endif
@@ -51,40 +51,40 @@ namespace mongo {
#else
RWLock(const char *) { }
#endif
- void lock(){
+ void lock() {
_m.lock();
#if defined(_DEBUG)
mutexDebugger.entering(_name);
#endif
}
- void unlock(){
+ void unlock() {
#if defined(_DEBUG)
mutexDebugger.leaving(_name);
#endif
_m.unlock();
}
-
- void lock_shared(){
+
+ void lock_shared() {
_m.lock_shared();
}
-
- void unlock_shared(){
+
+ void unlock_shared() {
_m.unlock_shared();
}
- bool lock_shared_try( int millis ){
+ bool lock_shared_try( int millis ) {
boost::system_time until = get_system_time();
until += boost::posix_time::milliseconds(millis);
- if( _m.timed_lock_shared( until ) ) {
+ if( _m.timed_lock_shared( until ) ) {
return true;
}
return false;
}
- bool lock_try( int millis = 0 ){
+ bool lock_try( int millis = 0 ) {
boost::system_time until = get_system_time();
until += boost::posix_time::milliseconds(millis);
- if( _m.timed_lock( until ) ) {
+ if( _m.timed_lock( until ) ) {
#if defined(_DEBUG)
mutexDebugger.entering(_name);
#endif
@@ -99,7 +99,7 @@ namespace mongo {
class RWLock {
pthread_rwlock_t _lock;
- inline void check( int x ){
+ inline void check( int x ) {
if( x == 0 )
return;
log() << "pthread rwlock failed: " << x << endl;
@@ -115,40 +115,40 @@ namespace mongo {
#endif
check( pthread_rwlock_init( &_lock , 0 ) );
}
-
- ~RWLock(){
- if ( ! StaticObserver::_destroyingStatics ){
+
+ ~RWLock() {
+ if ( ! StaticObserver::_destroyingStatics ) {
check( pthread_rwlock_destroy( &_lock ) );
}
}
- void lock(){
+ void lock() {
check( pthread_rwlock_wrlock( &_lock ) );
#if defined(_DEBUG)
mutexDebugger.entering(_name);
#endif
}
- void unlock(){
+ void unlock() {
#if defined(_DEBUG)
mutexDebugger.leaving(_name);
#endif
check( pthread_rwlock_unlock( &_lock ) );
}
-
- void lock_shared(){
+
+ void lock_shared() {
check( pthread_rwlock_rdlock( &_lock ) );
}
-
- void unlock_shared(){
+
+ void unlock_shared() {
check( pthread_rwlock_unlock( &_lock ) );
}
-
- bool lock_shared_try( int millis ){
+
+ bool lock_shared_try( int millis ) {
return _try( millis , false );
}
- bool lock_try( int millis = 0 ){
- if( _try( millis , true ) ) {
+ bool lock_try( int millis = 0 ) {
+ if( _try( millis , true ) ) {
#if defined(_DEBUG)
mutexDebugger.entering(_name);
#endif
@@ -157,31 +157,31 @@ namespace mongo {
return false;
}
- bool _try( int millis , bool write ){
+ bool _try( int millis , bool write ) {
while ( true ) {
- int x = write ?
- pthread_rwlock_trywrlock( &_lock ) :
- pthread_rwlock_tryrdlock( &_lock );
-
+ int x = write ?
+ pthread_rwlock_trywrlock( &_lock ) :
+ pthread_rwlock_tryrdlock( &_lock );
+
if ( x <= 0 ) {
return true;
}
-
+
if ( millis-- <= 0 )
return false;
-
- if ( x == EBUSY ){
+
+ if ( x == EBUSY ) {
sleepmillis(1);
continue;
}
check(x);
- }
-
+ }
+
return false;
}
};
-
+
#endif
@@ -190,7 +190,7 @@ namespace mongo {
public:
struct exception { };
rwlock_try_write(RWLock& l, int millis = 0) : _l(l) {
- if( !l.lock_try(millis) )
+ if( !l.lock_try(millis) )
throw exception();
}
~rwlock_try_write() { _l.unlock(); }
@@ -216,7 +216,7 @@ namespace mongo {
else
_lock.unlock_shared();
}
- private:
+ private:
RWLock& _lock;
const bool _write;
};
diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp
index 2e56acb47b3..0f33609d645 100644
--- a/util/concurrency/spin_lock.cpp
+++ b/util/concurrency/spin_lock.cpp
@@ -22,7 +22,7 @@
namespace mongo {
- SpinLock::~SpinLock() {
+ SpinLock::~SpinLock() {
#if defined(_WIN32)
DeleteCriticalSection(&_cs);
#endif
@@ -30,14 +30,14 @@ namespace mongo {
SpinLock::SpinLock()
#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
- : _locked( false ) { }
+ : _locked( false ) { }
#elif defined(_WIN32)
{ InitializeCriticalSectionAndSpinCount(&_cs, 4000); }
#else
: _mutex( "SpinLock" ) { }
#endif
- void SpinLock::lock(){
+ void SpinLock::lock() {
#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
// fast path
if (!_locked && !__sync_lock_test_and_set(&_locked, true)) {
@@ -65,17 +65,17 @@ namespace mongo {
#endif
}
- void SpinLock::unlock(){
+ void SpinLock::unlock() {
#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
__sync_lock_release(&_locked);
-#elif defined(WIN32)
+#elif defined(WIN32)
LeaveCriticalSection(&_cs);
#else
-
+
_mutex.unlock();
#endif
diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h
index 7324a6e4f32..d5360f7b3c6 100644
--- a/util/concurrency/spin_lock.h
+++ b/util/concurrency/spin_lock.h
@@ -48,7 +48,7 @@ namespace mongo {
// Non-copyable, non-assignable
SpinLock(SpinLock&);
SpinLock& operator=(SpinLock&);
- };
+ };
} // namespace mongo
diff --git a/util/concurrency/synchronization.cpp b/util/concurrency/synchronization.cpp
index 5639c888a32..12e2894ef1a 100644
--- a/util/concurrency/synchronization.cpp
+++ b/util/concurrency/synchronization.cpp
@@ -22,15 +22,15 @@ namespace mongo {
Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { }
- Notification::~Notification(){ }
+ Notification::~Notification() { }
- void Notification::waitToBeNotified(){
+ void Notification::waitToBeNotified() {
scoped_lock lock( _mutex );
while ( ! _notified )
_condition.wait( lock.boost() );
}
- void Notification::notifyOne(){
+ void Notification::notifyOne() {
scoped_lock lock( _mutex );
assert( !_notified );
_notified = true;
@@ -38,7 +38,7 @@ namespace mongo {
}
NotifyAll::NotifyAll() : _mutex("NotifyAll"), _counter(0) { }
-
+
void NotifyAll::wait() {
scoped_lock lock( _mutex );
unsigned long long old = _counter;
diff --git a/util/concurrency/synchronization.h b/util/concurrency/synchronization.h
index c2e70cabe19..ac2fcabcb86 100644
--- a/util/concurrency/synchronization.h
+++ b/util/concurrency/synchronization.h
@@ -52,12 +52,12 @@ namespace mongo {
/** establishes a synchronization point between threads. N threads are waits and one is notifier.
threadsafe.
*/
- class NotifyAll : boost::noncopyable {
+ class NotifyAll : boost::noncopyable {
public:
NotifyAll();
- /** awaits the next notifyAll() call by another thread. notifications that precede this
- call are ignored -- we are looking for a fresh event.
+ /** awaits the next notifyAll() call by another thread. notifications that precede this
+ call are ignored -- we are looking for a fresh event.
*/
void wait();
diff --git a/util/concurrency/task.cpp b/util/concurrency/task.cpp
index 20801f10217..d84cd71dceb 100644
--- a/util/concurrency/task.cpp
+++ b/util/concurrency/task.cpp
@@ -25,11 +25,11 @@
#include "../unittest.h"
#include "../time_support.h"
-namespace mongo {
+namespace mongo {
- namespace task {
+ namespace task {
- /*void foo() {
+ /*void foo() {
boost::mutex m;
boost::mutex::scoped_lock lk(m);
boost::condition cond;
@@ -37,21 +37,21 @@ namespace mongo {
cond.notify_one();
}*/
- Task::Task()
- : BackgroundJob( true /* deleteSelf */ ) {
+ Task::Task()
+ : BackgroundJob( true /* deleteSelf */ ) {
n = 0;
repeat = 0;
}
void Task::halt() { repeat = 0; }
- void Task::run() {
+ void Task::run() {
assert( n == 0 );
while( 1 ) {
n++;
- try {
+ try {
doWork();
- }
+ }
catch(...) { }
if( repeat == 0 )
break;
@@ -65,11 +65,11 @@ namespace mongo {
go();
}
- void fork(Task *t) {
+ void fork(Task *t) {
t->begin();
}
- void repeat(Task *t, unsigned millis) {
+ void repeat(Task *t, unsigned millis) {
t->repeat = millis;
t->begin();
}
@@ -110,7 +110,7 @@ namespace mongo {
}
}
- void Server::send( lam msg ) {
+ void Server::send( lam msg ) {
{
boost::mutex::scoped_lock lk(m);
d.push_back(msg);
@@ -118,9 +118,9 @@ namespace mongo {
c.notify_one();
}
- void Server::doWork() {
+ void Server::doWork() {
starting();
- while( 1 ) {
+ while( 1 ) {
lam f;
try {
boost::mutex::scoped_lock lk(m);
@@ -129,7 +129,7 @@ namespace mongo {
f = d.front();
d.pop_front();
}
- catch(...) {
+ catch(...) {
log() << "ERROR exception in Server:doWork?" << endl;
}
try {
@@ -141,27 +141,28 @@ namespace mongo {
d.push_back(f);
}
}
- } catch(std::exception& e) {
- log() << "Server::doWork task:" << name() << " exception:" << e.what() << endl;
- }
- catch(const char *p) {
- log() << "Server::doWork task:" << name() << " unknown c exception:" <<
- ((p&&strlen(p)<800)?p:"?") << endl;
- }
- catch(...) {
- log() << "Server::doWork unknown exception task:" << name() << endl;
+ }
+ catch(std::exception& e) {
+ log() << "Server::doWork task:" << name() << " exception:" << e.what() << endl;
+ }
+ catch(const char *p) {
+ log() << "Server::doWork task:" << name() << " unknown c exception:" <<
+ ((p&&strlen(p)<800)?p:"?") << endl;
+ }
+ catch(...) {
+ log() << "Server::doWork unknown exception task:" << name() << endl;
}
}
}
static Server *s;
- static void abc(int i) {
+ static void abc(int i) {
cout << "Hello " << i << endl;
s->requeue();
}
class TaskUnitTest : public mongo::UnitTest {
public:
- virtual void run() {
+ virtual void run() {
lam f = boost::bind(abc, 3);
//f();
diff --git a/util/concurrency/task.h b/util/concurrency/task.h
index 654ecd35fd2..d7b45eeef24 100644
--- a/util/concurrency/task.h
+++ b/util/concurrency/task.h
@@ -20,9 +20,9 @@
#include "../background.h"
-namespace mongo {
+namespace mongo {
- namespace task {
+ namespace task {
/** abstraction around threads. simpler than BackgroundJob which is used behind the scenes.
allocate the Task dynamically. when the thread terminates, the Task object will delete itself.
@@ -34,7 +34,7 @@ namespace mongo {
public:
Task();
- /** for a repeating task, stop after current invocation ends. can be called by other threads
+ /** for a repeating task, stop after current invocation ends. can be called by other threads
as long as the Task is still in scope.
*/
void halt();
@@ -54,8 +54,8 @@ namespace mongo {
void repeat(Task *t, unsigned millis);
/*** Example ***
- inline void sample() {
- class Sample : public Task {
+ inline void sample() {
+ class Sample : public Task {
public:
int result;
virtual void doWork() { result = 1234; }
diff --git a/util/concurrency/thread_pool.cpp b/util/concurrency/thread_pool.cpp
index 2caac1ff3f3..1c258847cb5 100644
--- a/util/concurrency/thread_pool.cpp
+++ b/util/concurrency/thread_pool.cpp
@@ -20,8 +20,8 @@
#include "thread_pool.h"
#include "mvar.h"
-namespace mongo{
- namespace threadpool{
+namespace mongo {
+ namespace threadpool {
// Worker thread
class Worker : boost::noncopyable {
@@ -34,12 +34,12 @@ namespace mongo{
// destructor will block until current operation is completed
// Acts as a "join" on this thread
- ~Worker(){
+ ~Worker() {
_task.put(Task());
_thread.join();
}
- void set_task(Task& func){
+ void set_task(Task& func) {
assert(!func.empty());
assert(_is_done);
_is_done = false;
@@ -47,13 +47,13 @@ namespace mongo{
_task.put(func);
}
- private:
+ private:
ThreadPool& _owner;
MVar<Task> _task;
bool _is_done; // only used for error detection
boost::thread _thread;
- void loop(){
+ void loop() {
while (true) {
Task task = _task.take();
if (task.empty())
@@ -61,9 +61,11 @@ namespace mongo{
try {
task();
- } catch (std::exception e){
+ }
+ catch (std::exception e) {
log() << "Unhandled exception in worker thread: " << e.what() << endl;;
- } catch (...){
+ }
+ catch (...) {
log() << "Unhandled non-exception in worker thread" << endl;
}
_is_done = true;
@@ -74,16 +76,15 @@ namespace mongo{
ThreadPool::ThreadPool(int nThreads)
: _mutex("ThreadPool"), _tasksRemaining(0)
- , _nThreads(nThreads)
- {
+ , _nThreads(nThreads) {
scoped_lock lock(_mutex);
- while (nThreads-- > 0){
+ while (nThreads-- > 0) {
Worker* worker = new Worker(*this);
_freeWorkers.push_front(worker);
}
}
- ThreadPool::~ThreadPool(){
+ ThreadPool::~ThreadPool() {
join();
assert(_tasks.empty());
@@ -91,40 +92,42 @@ namespace mongo{
// O(n) but n should be small
assert(_freeWorkers.size() == (unsigned)_nThreads);
- while(!_freeWorkers.empty()){
+ while(!_freeWorkers.empty()) {
delete _freeWorkers.front();
_freeWorkers.pop_front();
}
}
- void ThreadPool::join(){
+ void ThreadPool::join() {
scoped_lock lock(_mutex);
- while(_tasksRemaining){
+ while(_tasksRemaining) {
_condition.wait(lock.boost());
}
}
- void ThreadPool::schedule(Task task){
+ void ThreadPool::schedule(Task task) {
scoped_lock lock(_mutex);
_tasksRemaining++;
- if (!_freeWorkers.empty()){
+ if (!_freeWorkers.empty()) {
_freeWorkers.front()->set_task(task);
_freeWorkers.pop_front();
- }else{
+ }
+ else {
_tasks.push_back(task);
}
}
// should only be called by a worker from the worker thread
- void ThreadPool::task_done(Worker* worker){
+ void ThreadPool::task_done(Worker* worker) {
scoped_lock lock(_mutex);
- if (!_tasks.empty()){
+ if (!_tasks.empty()) {
worker->set_task(_tasks.front());
_tasks.pop_front();
- }else{
+ }
+ else {
_freeWorkers.push_front(worker);
}
diff --git a/util/concurrency/thread_pool.h b/util/concurrency/thread_pool.h
index 31e06430088..b348ed1d01b 100644
--- a/util/concurrency/thread_pool.h
+++ b/util/concurrency/thread_pool.h
@@ -24,59 +24,59 @@
namespace mongo {
-namespace threadpool {
- class Worker;
-
- typedef boost::function<void(void)> Task; //nullary function or functor
-
- // exported to the mongo namespace
- class ThreadPool : boost::noncopyable{
- public:
- explicit ThreadPool(int nThreads=8);
-
- // blocks until all tasks are complete (tasks_remaining() == 0)
- // You should not call schedule while in the destructor
- ~ThreadPool();
-
- // blocks until all tasks are complete (tasks_remaining() == 0)
- // does not prevent new tasks from being scheduled so could wait forever.
- // Also, new tasks could be scheduled after this returns.
- void join();
-
- // task will be copied a few times so make sure it's relatively cheap
- void schedule(Task task);
-
- // Helpers that wrap schedule and boost::bind.
- // Functor and args will be copied a few times so make sure it's relatively cheap
- template<typename F, typename A>
- void schedule(F f, A a){ schedule(boost::bind(f,a)); }
- template<typename F, typename A, typename B>
- void schedule(F f, A a, B b){ schedule(boost::bind(f,a,b)); }
- template<typename F, typename A, typename B, typename C>
- void schedule(F f, A a, B b, C c){ schedule(boost::bind(f,a,b,c)); }
- template<typename F, typename A, typename B, typename C, typename D>
- void schedule(F f, A a, B b, C c, D d){ schedule(boost::bind(f,a,b,c,d)); }
- template<typename F, typename A, typename B, typename C, typename D, typename E>
- void schedule(F f, A a, B b, C c, D d, E e){ schedule(boost::bind(f,a,b,c,d,e)); }
-
- int tasks_remaining() { return _tasksRemaining; }
-
- private:
- mongo::mutex _mutex;
- boost::condition _condition;
-
- list<Worker*> _freeWorkers; //used as LIFO stack (always front)
- list<Task> _tasks; //used as FIFO queue (push_back, pop_front)
- int _tasksRemaining; // in queue + currently processing
- int _nThreads; // only used for sanity checking. could be removed in the future.
-
- // should only be called by a worker from the worker's thread
- void task_done(Worker* worker);
- friend class Worker;
- };
-
-} //namespace threadpool
-
-using threadpool::ThreadPool;
+ namespace threadpool {
+ class Worker;
+
+ typedef boost::function<void(void)> Task; //nullary function or functor
+
+ // exported to the mongo namespace
+ class ThreadPool : boost::noncopyable {
+ public:
+ explicit ThreadPool(int nThreads=8);
+
+ // blocks until all tasks are complete (tasks_remaining() == 0)
+ // You should not call schedule while in the destructor
+ ~ThreadPool();
+
+ // blocks until all tasks are complete (tasks_remaining() == 0)
+ // does not prevent new tasks from being scheduled so could wait forever.
+ // Also, new tasks could be scheduled after this returns.
+ void join();
+
+ // task will be copied a few times so make sure it's relatively cheap
+ void schedule(Task task);
+
+ // Helpers that wrap schedule and boost::bind.
+ // Functor and args will be copied a few times so make sure it's relatively cheap
+ template<typename F, typename A>
+ void schedule(F f, A a) { schedule(boost::bind(f,a)); }
+ template<typename F, typename A, typename B>
+ void schedule(F f, A a, B b) { schedule(boost::bind(f,a,b)); }
+ template<typename F, typename A, typename B, typename C>
+ void schedule(F f, A a, B b, C c) { schedule(boost::bind(f,a,b,c)); }
+ template<typename F, typename A, typename B, typename C, typename D>
+ void schedule(F f, A a, B b, C c, D d) { schedule(boost::bind(f,a,b,c,d)); }
+ template<typename F, typename A, typename B, typename C, typename D, typename E>
+ void schedule(F f, A a, B b, C c, D d, E e) { schedule(boost::bind(f,a,b,c,d,e)); }
+
+ int tasks_remaining() { return _tasksRemaining; }
+
+ private:
+ mongo::mutex _mutex;
+ boost::condition _condition;
+
+ list<Worker*> _freeWorkers; //used as LIFO stack (always front)
+ list<Task> _tasks; //used as FIFO queue (push_back, pop_front)
+ int _tasksRemaining; // in queue + currently processing
+ int _nThreads; // only used for sanity checking. could be removed in the future.
+
+ // should only be called by a worker from the worker's thread
+ void task_done(Worker* worker);
+ friend class Worker;
+ };
+
+ } //namespace threadpool
+
+ using threadpool::ThreadPool;
} //namespace mongo
diff --git a/util/concurrency/value.h b/util/concurrency/value.h
index dabeb956e43..08d53062bf6 100644
--- a/util/concurrency/value.h
+++ b/util/concurrency/value.h
@@ -20,11 +20,11 @@
#pragma once
-namespace mongo {
+namespace mongo {
extern mutex _atomicMutex;
- /** atomic wrapper for a value. enters a mutex on each access. must
+ /** atomic wrapper for a value. enters a mutex on each access. must
be copyable.
*/
template<typename T>
@@ -33,20 +33,22 @@ namespace mongo {
public:
Atomic<T>() { }
- void operator=(const T& a) {
+ void operator=(const T& a) {
scoped_lock lk(_atomicMutex);
- val = a; }
+ val = a;
+ }
- operator T() const {
+ operator T() const {
scoped_lock lk(_atomicMutex);
- return val; }
-
+ return val;
+ }
+
/** example:
Atomic<int> q;
...
{
Atomic<int>::tran t(q);
- if( q.ref() > 0 )
+ if( q.ref() > 0 )
q.ref()--;
}
*/
@@ -58,11 +60,11 @@ namespace mongo {
};
};
- /** this string COULD be mangled but with the double buffering, assuming writes
- are infrequent, it's unlikely. thus, this is reasonable for lockless setting of
+ /** this string COULD be mangled but with the double buffering, assuming writes
+ are infrequent, it's unlikely. thus, this is reasonable for lockless setting of
diagnostic strings, where their content isn't critical.
*/
- class DiagStr {
+ class DiagStr {
char buf1[256];
char buf2[256];
char *p;
diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp
index 0bf52ec048e..3d057a4801e 100644
--- a/util/concurrency/vars.cpp
+++ b/util/concurrency/vars.cpp
@@ -20,28 +20,28 @@
#include "value.h"
#include "mutex.h"
-namespace mongo {
+namespace mongo {
mongo::mutex _atomicMutex("_atomicMutex");
// intentional leak. otherwise destructor orders can be problematic at termination.
MutexDebugger &mutexDebugger = *(new MutexDebugger());
- MutexDebugger::MutexDebugger() :
- x( *(new boost::mutex()) ), magic(0x12345678) {
- // optional way to debug lock order
- /*
- a = "a_lock";
- b = "b_lock";
- */
+ MutexDebugger::MutexDebugger() :
+ x( *(new boost::mutex()) ), magic(0x12345678) {
+ // optional way to debug lock order
+ /*
+ a = "a_lock";
+ b = "b_lock";
+ */
}
- void MutexDebugger::programEnding() {
+ void MutexDebugger::programEnding() {
if( logLevel>=1 && followers.size() ) {
std::cout << followers.size() << " mutexes in program" << endl;
- for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) {
+ for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) {
cout << i->first;
- if( maxNest[i->first] > 1 )
+ if( maxNest[i->first] > 1 )
cout << " maxNest:" << maxNest[i->first];
cout << '\n';
for( set<mid>::iterator j = i->second.begin(); j != i->second.end(); j++ )